This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 72b45f3114 NIFI-13202 Removed Accumulo Processors and Services This
closes #8794.
72b45f3114 is described below
commit 72b45f311488a40e518e97a039dd7ecc5378a3f3
Author: exceptionfactory <[email protected]>
AuthorDate: Thu May 9 15:45:16 2024 -0500
NIFI-13202 Removed Accumulo Processors and Services
This closes #8794.
Signed-off-by: Joseph Witt <[email protected]>
---
nifi-assembly/pom.xml | 30 -
nifi-code-coverage/pom.xml | 10 -
nifi-docs/src/main/asciidoc/developer-guide.adoc | 1 -
.../nifi-accumulo-bundle/README.md | 23 -
.../nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml | 41 --
.../nifi-accumulo-processors/pom.xml | 66 ---
.../accumulo/data/AccumuloRecordConfiguration.java | 159 -----
.../org/apache/nifi/accumulo/data/KeySchema.java | 136 -----
.../accumulo/processors/BaseAccumuloProcessor.java | 86 ---
.../accumulo/processors/PutAccumuloRecord.java | 658 ---------------------
.../nifi/accumulo/processors/ScanAccumulo.java | 390 ------------
.../services/org.apache.nifi.processor.Processor | 16 -
.../nifi-accumulo-services-api-nar/pom.xml | 40 --
.../nifi-accumulo-services-api/pom.xml | 35 --
.../controllerservices/BaseAccumuloService.java | 33 --
.../nifi-accumulo-services-nar/pom.xml | 41 --
.../nifi-accumulo-services/pom.xml | 70 ---
.../controllerservices/AccumuloService.java | 324 ----------
.../org.apache.nifi.controller.ControllerService | 15 -
.../controllerservices/TestAccumuloService.java | 214 -------
.../nifi-accumulo-bundle/pom.xml | 85 ---
nifi-extension-bundles/pom.xml | 1 -
22 files changed, 2474 deletions(-)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 6501ad0e66..e06bf20559 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -1094,36 +1094,6 @@ language governing permissions and limitations under the
License. -->
</dependency>
</dependencies>
</profile>
- <profile>
- <id>include-accumulo</id>
- <!-- This profile handles the inclusion of nifi-accumulo
artifacts. -->
- <activation>
- <activeByDefault>false</activeByDefault>
- <property>
- <name>allProfiles</name>
- </property>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-nar</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services-api-nar</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services-nar</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
- </dependencies>
- </profile>
<profile>
<id>targz</id>
<activation>
diff --git a/nifi-code-coverage/pom.xml b/nifi-code-coverage/pom.xml
index c8e3f5a23e..50e5767338 100644
--- a/nifi-code-coverage/pom.xml
+++ b/nifi-code-coverage/pom.xml
@@ -701,16 +701,6 @@
</dependency>
<!-- NAR Bundles -->
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-processors</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-airtable-processors</artifactId>
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc
b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index 5f6cc76a27..7dd0ddb961 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -2698,7 +2698,6 @@ deprecationLogger.warn(
[options="header,footer"]
|==================================================================================================================================================
| Package | Maven Profile | Description
-| Apache Accumulo Bundle | include-accumulo | Adds support for
https://accumulo.apache.org[Apache Accumulo].
| Apache Hadoop Bundle | include-hadoop | Adds support for Apache
Hadoop with HDFS and Parquet components
| Apache HBase Bundle | include-hbase | Adds support for Apache
HBase
| Apache IoTDB Bundle | include-iotdb | Adds support for Apache
IoTDB
diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/README.md
b/nifi-extension-bundles/nifi-accumulo-bundle/README.md
deleted file mode 100644
index e431586c24..0000000000
--- a/nifi-extension-bundles/nifi-accumulo-bundle/README.md
+++ /dev/null
@@ -1,23 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-# nifi-accumulo
-
-This is a basic NiFi->Accumulo integration. Running `mvn install` will create
your NAR, which can be added
-to Apache NiFi. This is intended to be created with Apache Accumulo 2.x.
-
-The resulting NAR will be named 'nifi-accumulo-nar'
-
-
-Note that some of this code was modeled after the HBase work.
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
deleted file mode 100644
index 000091b414..0000000000
--- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-bundle</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-accumulo-nar</artifactId>
- <packaging>nar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-processors</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services-api-nar</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
deleted file mode 100644
index b1e3d370da..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
+++ /dev/null
@@ -1,66 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-bundle</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-accumulo-processors</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services-api</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-record</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-record-serialization-service-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-record-path</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
-</project>
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
deleted file mode 100644
index 164f9d5284..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.accumulo.data;
-
-/**
- * Encapsulates configuring the session with some required parameters.
- *
- * Justification: Generally not a fan of this fluent API to configure other
objects, but there is a lot encapsulated here
- * so it helps minimize what we pass between the current set of classes and
the upcoming features.
- */
-public class AccumuloRecordConfiguration {
- private String tableName;
- private String rowFieldName;
- private String columnFamily;
- private String columnFamilyField;
- private String timestampField;
- private String fieldDelimiter;
- private boolean encodeFieldDelimiter;
- private boolean qualifierInKey;
- private boolean deleteKeys;
-
-
- protected AccumuloRecordConfiguration(final String tableName, final String
rowFieldName, final String columnFamily,
- final String columnFamilyField,
- final String timestampField, final
String fieldDelimiter,
- final boolean encodeFieldDelimiter,
- final boolean qualifierInKey, final
boolean deleteKeys) {
- this.tableName = tableName;
- this.rowFieldName = rowFieldName;
- this.columnFamily = columnFamily;
- this.columnFamilyField = columnFamilyField;
- this.timestampField = timestampField;
- this.fieldDelimiter = fieldDelimiter;
- this.encodeFieldDelimiter = encodeFieldDelimiter;
- this.qualifierInKey = qualifierInKey;
- this.deleteKeys = deleteKeys;
- }
-
- public String getTableName(){
- return tableName;
- }
-
- public String getColumnFamily() {
- return columnFamily;
- }
-
- public String getColumnFamilyField() {
- return columnFamilyField;
- }
-
- public boolean getEncodeDelimiter(){
- return encodeFieldDelimiter;
- }
-
- public String getTimestampField(){
-
- return timestampField;
- }
-
- public String getFieldDelimiter(){
- return fieldDelimiter;
- }
-
- public boolean getQualifierInKey(){
- return qualifierInKey;
- }
-
- public boolean isDeleteKeys(){
- return deleteKeys;
- }
-
-
- public String getRowField(){
- return rowFieldName;
- }
-
- public static class Builder{
-
- public static final Builder newBuilder(){
- return new Builder();
- }
-
- public Builder setRowField(final String rowFieldName){
- this.rowFieldName = rowFieldName;
- return this;
- }
-
- public Builder setTableName(final String tableName){
- this.tableName = tableName;
- return this;
- }
-
- public Builder setEncodeFieldDelimiter(final boolean
encodeFieldDelimiter){
- this.encodeFieldDelimiter = encodeFieldDelimiter;
- return this;
- }
-
-
- public Builder setColumnFamily(final String columnFamily){
- this.columnFamily = columnFamily;
- return this;
- }
-
- public Builder setColumnFamilyField(final String columnFamilyField){
- this.columnFamilyField = columnFamilyField;
- return this;
- }
-
- public Builder setTimestampField(final String timestampField){
- this.timestampField = timestampField;
- return this;
- }
-
- public Builder setQualifierInKey(final boolean qualifierInKey){
- this.qualifierInKey = qualifierInKey;
- return this;
- }
-
- public Builder setFieldDelimiter(final String fieldDelimiter){
- this.fieldDelimiter = fieldDelimiter;
- return this;
- }
-
- public Builder setDelete(final boolean deleteKeys){
- this.deleteKeys = deleteKeys;
- return this;
- }
-
- public AccumuloRecordConfiguration build(){
- return new
AccumuloRecordConfiguration(tableName,rowFieldName,columnFamily,columnFamilyField,timestampField,fieldDelimiter,encodeFieldDelimiter,qualifierInKey,deleteKeys);
- }
-
-
- private String tableName;
- private String rowFieldName;
- private String columnFamily;
- private String columnFamilyField;
- private String fieldDelimiter;
- private boolean qualifierInKey=false;
- private boolean encodeFieldDelimiter=false;
- private String timestampField;
- private boolean deleteKeys=false;
- }
-}
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
deleted file mode 100644
index adb7da8d4f..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.accumulo.data;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldRemovalPath;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
-public class KeySchema implements RecordSchema {
- private static final List<RecordField> KEY_FIELDS = new ArrayList<>();
-
- private static final List<DataType> DATA_TYPES = new ArrayList<>();
-
- private static final List<String> FIELD_NAMES = new ArrayList<>();
-
- static {
- KEY_FIELDS.add(new RecordField("row",
RecordFieldType.STRING.getDataType(),false));
- KEY_FIELDS.add(new
RecordField("columnFamily",RecordFieldType.STRING.getDataType(),true));
- KEY_FIELDS.add(new
RecordField("columnQualifier",RecordFieldType.STRING.getDataType(),true));
- KEY_FIELDS.add(new
RecordField("columnVisibility",RecordFieldType.STRING.getDataType(),true));
- KEY_FIELDS.add(new
RecordField("timestamp",RecordFieldType.LONG.getDataType(),true));
- DATA_TYPES.add(RecordFieldType.STRING.getDataType());
- DATA_TYPES.add(RecordFieldType.LONG.getDataType());
- FIELD_NAMES.addAll(KEY_FIELDS.stream().map( x->
x.getFieldName()).collect(Collectors.toList()));
- }
- @Override
- public List<RecordField> getFields() {
- return KEY_FIELDS;
- }
-
- @Override
- public int getFieldCount() {
- return KEY_FIELDS.size();
- }
-
- @Override
- public RecordField getField(int i) {
- return KEY_FIELDS.get(i);
- }
-
- @Override
- public List<DataType> getDataTypes() {
- return DATA_TYPES;
- }
-
- @Override
- public List<String> getFieldNames() {
- return FIELD_NAMES;
- }
-
- @Override
- public Optional<DataType> getDataType(String s) {
- if (s.equalsIgnoreCase("timestamp")){
- return Optional.of( RecordFieldType.LONG.getDataType() );
- } else{
- if (FIELD_NAMES.stream().filter(x ->
s.equalsIgnoreCase(s)).count() > 0){
- return Optional.of(RecordFieldType.STRING.getDataType());
- }
- }
- return Optional.empty();
- }
-
- @Override
- public Optional<String> getSchemaText() {
- return Optional.empty();
- }
-
- @Override
- public Optional<String> getSchemaFormat() {
- return Optional.empty();
- }
-
- @Override
- public Optional<RecordField> getField(final String s) {
- return KEY_FIELDS.stream().filter(x ->
x.getFieldName().equalsIgnoreCase(s)).findFirst();
- }
-
- @Override
- public SchemaIdentifier getIdentifier() {
- return
SchemaIdentifier.builder().name("AccumuloKeySchema").version(1).branch("nifi-accumulo").build();
- }
-
- @Override
- public Optional<String> getSchemaName() {
- return Optional.of("AccumuloKeySchema");
- }
-
- @Override
- public Optional<String> getSchemaNamespace() {
- return Optional.of("nifi-accumulo");
- }
-
- @Override
- public void removeField(String fieldName) {
- throw new NotImplementedException("Field removal from Accumulo
KeySchema is not implemented.");
- }
-
- @Override
- public void removePath(RecordFieldRemovalPath path) {
- throw new NotImplementedException("Path removal from Accumulo
KeySchema is not implemented.");
- }
-
- @Override
- public boolean renameField(final String currentName, final String newName)
{
- throw new NotImplementedException("Field renaming from Accumulo
KeySchema is not implemented.");
- }
-
- @Override
- public boolean isRecursive() {
- throw new NotImplementedException("Determining if an Accumulo
KeySchema is recursive is not implemented.");
- }
-}
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
deleted file mode 100644
index 7c8ed9735f..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.accumulo.processors;
-
-import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Base Accumulo class that provides connector services, table name, and thread
- * properties
- */
-public abstract class BaseAccumuloProcessor extends AbstractProcessor {
-
- protected static final PropertyDescriptor ACCUMULO_CONNECTOR_SERVICE = new
PropertyDescriptor.Builder()
- .name("accumulo-connector-service")
- .displayName("Accumulo Connector Service")
- .description("Specifies the Controller Service to use for
accessing Accumulo.")
- .required(true)
- .identifiesControllerService(BaseAccumuloService.class)
- .build();
-
-
- protected static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
- .name("Table Name")
- .description("The name of the Accumulo Table into which data will
be placed")
- .required(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- protected static final PropertyDescriptor CREATE_TABLE = new
PropertyDescriptor.Builder()
- .name("Create Table")
- .description("Creates a table if it does not exist. This property
will only be used when EL is not present in 'Table Name'")
- .required(true)
- .defaultValue("False")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
-
- protected static final PropertyDescriptor THREADS = new
PropertyDescriptor.Builder()
- .name("Threads")
- .description("Number of threads used for reading and writing")
- .required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("10")
- .build();
-
- protected static final PropertyDescriptor ACCUMULO_TIMEOUT = new
PropertyDescriptor.Builder()
- .name("accumulo-timeout")
- .displayName("Accumulo Timeout")
- .description("Max amount of time to wait for an unresponsive
server. Set to 0 sec for no timeout. Entered value less than 1 second may be
converted to 0 sec.")
- .required(false)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("30 sec")
- .build();
-
- /**
- * Implementations can decide to include all base properties or
individually include them. List is immutable
- * so that implementations must constructor their own lists knowingly
- */
-
- protected static final List<PropertyDescriptor> baseProperties =
Collections.unmodifiableList(Arrays.asList(ACCUMULO_CONNECTOR_SERVICE,
TABLE_NAME, CREATE_TABLE, THREADS, ACCUMULO_TIMEOUT));
-}
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
deleted file mode 100644
index 09b094153e..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
+++ /dev/null
@@ -1,658 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.accumulo.processors;
-
-
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.MultiTableBatchWriter;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.hadoop.io.Text;
-import org.apache.nifi.annotation.behavior.DynamicProperties;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.record.path.FieldValue;
-import org.apache.nifi.record.path.RecordPath;
-import org.apache.nifi.record.path.RecordPathResult;
-import org.apache.nifi.record.path.util.RecordPathCache;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StringUtils;
-import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
-import org.apache.nifi.accumulo.data.AccumuloRecordConfiguration;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.HexFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-@SupportsBatching
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@Tags({"hadoop", "accumulo", "put", "record"})
-@CapabilityDescription("This is a record aware processor that reads the
content of the incoming FlowFile as individual records using the " +
- "configured 'Record Reader' and writes them to Apache Accumulo.")
-@DynamicProperties({
- @DynamicProperty(name = "visibility.<COLUMN FAMILY>", description =
"Visibility label for everything under that column family " +
- "when a specific label for a particular column qualifier is
not available.", expressionLanguageScope =
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
- value = "visibility label for <COLUMN FAMILY>"
- ),
- @DynamicProperty(name = "visibility.<COLUMN FAMILY>.<COLUMN
QUALIFIER>", description = "Visibility label for the specified column qualifier
" +
- "qualified by a configured column family.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
- value = "visibility label for <COLUMN FAMILY>:<COLUMN
QUALIFIER>."
- )
-})
-/**
- * Purpose and Design: Requires a connector be defined by way of an
AccumuloService object. This class
- * simply extends BaseAccumuloProcessor to extract records from a flow file.
The location of a record field value can be
- * placed into the value or part of the column qualifier ( this can/may change
)
- *
- * Supports deletes. If the delete flag is used we'll delete keys found within
that flow file.
- */
-public class PutAccumuloRecord extends BaseAccumuloProcessor {
-
- protected static final PropertyDescriptor MEMORY_SIZE = new
PropertyDescriptor.Builder()
- .name("Memory Size")
- .description("The maximum memory size Accumulo at any one time
from the record set.")
- .required(true)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .defaultValue("10 MB")
- .build();
-
- protected static final PropertyDescriptor COLUMN_FAMILY = new
PropertyDescriptor.Builder()
- .name("Column Family")
- .description("The Column Family to use when inserting data into
Accumulo")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(Validator.VALID)
- .build();
-
- protected static final PropertyDescriptor COLUMN_FAMILY_FIELD = new
PropertyDescriptor.Builder()
- .name("Column Family Field")
- .description("Field name used as the column family if one is not
specified above.")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(Validator.VALID)
- .build();
-
- protected static final PropertyDescriptor DELETE_KEY = new
PropertyDescriptor.Builder()
- .name("delete-key")
- .displayName("Delete Key")
- .description("Deletes the key")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- protected static final PropertyDescriptor RECORD_IN_QUALIFIER = new
PropertyDescriptor.Builder()
- .name("record-value-in-qualifier")
- .displayName("Record Value In Qualifier")
- .description("Places the record value into the column qualifier
instead of the value.")
- .required(false)
- .defaultValue("False")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
- protected static final PropertyDescriptor FLUSH_ON_FLOWFILE = new
PropertyDescriptor.Builder()
- .name("flush-on-flow-file")
- .displayName("Flush Every FlowFile")
- .description("Flushes the table writer on every flow file.")
- .required(true)
- .defaultValue("True")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
- protected static final PropertyDescriptor FIELD_DELIMITER_AS_HEX = new
PropertyDescriptor.Builder()
- .name("field-delimiter-as-hex")
- .displayName("Hex Encode Field Delimiter")
- .description("Allows you to hex encode the delimiter as a
character. So 0x00 places a null character between the record name and value.")
- .required(false)
- .defaultValue("False")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
- protected static final PropertyDescriptor FIELD_DELIMITER = new
PropertyDescriptor.Builder()
- .name("field-delimiter")
- .displayName("Field Delimiter")
- .description("Delimiter between the record value and name. ")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
-
- protected static final PropertyDescriptor RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
- .name("record-reader")
- .displayName("Record Reader")
- .description("Specifies the Controller Service to use for parsing
incoming data and determining the data's schema")
- .identifiesControllerService(RecordReaderFactory.class)
- .required(true)
- .build();
-
- protected static final PropertyDescriptor ROW_FIELD_NAME = new
PropertyDescriptor.Builder()
- .name("Row Identifier Field Name")
- .description("Specifies the name of a record field whose value
should be used as the row id for the given record." +
- " If EL defines a value that is not a field name that will
be used as the row identifier.")
- .required(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
-
- protected static final PropertyDescriptor TIMESTAMP_FIELD = new
PropertyDescriptor.Builder()
- .name("timestamp-field")
- .displayName("Timestamp Field")
- .description("Specifies the name of a record field whose value
should be used as the timestamp. If empty a timestamp will be recorded as the
time of insertion")
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
-
- protected static final PropertyDescriptor VISIBILITY_PATH = new
PropertyDescriptor.Builder()
- .name("visibility-path")
- .displayName("Visibility String Record Path Root")
- .description("A record path that points to part of the record
which contains a path to a mapping of visibility strings to record paths")
- .required(false)
- .addValidator(Validator.VALID)
- .build();
-
- protected static final PropertyDescriptor DEFAULT_VISIBILITY = new
PropertyDescriptor.Builder()
- .name("default-visibility")
- .displayName("Default Visibility")
- .description("Default visibility when VISIBILITY_PATH is not
defined. ")
- .required(false)
- .addValidator(Validator.VALID)
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("A FlowFile is routed to this relationship after it
has been successfully stored in Accumulo")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("A FlowFile is routed to this relationship if it
cannot be sent to Accumulo")
- .build();
-
-
- /**
- * Connector service which provides us a connector if the configuration is
correct.
- */
- protected BaseAccumuloService accumuloConnectorService;
-
- /**
- * Connector that we need to persist while we are operational.
- */
- protected AccumuloClient client;
-
- /**
- * Table writer that will close when we shutdown or upon error.
- */
- private MultiTableBatchWriter tableWriter = null;
-
- /**
- * Record path cache
- */
- protected RecordPathCache recordPathCache;
-
-
- /**
- * Flushes the tableWriter on every flow file if true.
- */
- protected boolean flushOnEveryFlow;
-
- @Override
- public Set<Relationship> getRelationships() {
- final Set<Relationship> rels = new HashSet<>();
- rels.add(REL_SUCCESS);
- rels.add(REL_FAILURE);
- return rels;
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
- Collection<ValidationResult> set = new HashSet<>();
- if (!validationContext.getProperty(COLUMN_FAMILY).isSet() &&
!validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
- set.add(new ValidationResult.Builder().explanation("Column Family
OR Column family field name must be defined").build());
- else if (validationContext.getProperty(COLUMN_FAMILY).isSet() &&
validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
- set.add(new ValidationResult.Builder().explanation("Column Family
OR Column family field name must be defined, but not both").build());
- return set;
- }
-
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- accumuloConnectorService =
context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
- final Double maxBytes =
context.getProperty(MEMORY_SIZE).asDataSize(DataUnit.B);
- this.client = accumuloConnectorService.getClient();
- BatchWriterConfig writerConfig = new BatchWriterConfig();
-
writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger());
- writerConfig.setMaxMemory(maxBytes.longValue());
-
writerConfig.setTimeout(context.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(),
TimeUnit.SECONDS);
- tableWriter = client.createMultiTableBatchWriter(writerConfig);
- flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean();
- if (!flushOnEveryFlow){
- writerConfig.setMaxLatency(60, TimeUnit.SECONDS);
- }
-
- if (context.getProperty(CREATE_TABLE).asBoolean() &&
!context.getProperty(TABLE_NAME).isExpressionLanguagePresent()) {
- final Map<String, String> flowAttributes = new HashMap<>();
- final String table =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
- final TableOperations tableOps = this.client.tableOperations();
- if (!tableOps.exists(table)) {
- getLogger().info("Creating " + table + " table.");
- try {
- tableOps.create(table);
- } catch (TableExistsException te) {
- // can safely ignore
- } catch (AccumuloSecurityException | AccumuloException e) {
- getLogger().info("Accumulo or Security error creating.
Continuing... " + table + ". ", e);
- }
- }
- }
- }
-
-
- @OnUnscheduled
- @OnDisabled
- public synchronized void shutdown(){
- /**
- * Close the writer when we are shut down.
- */
- if (null != tableWriter){
- try {
- tableWriter.close();
- } catch (MutationsRejectedException e) {
- getLogger().error("Mutations were rejected",e);
- }
- tableWriter = null;
- }
- }
-
- @Override
- public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> properties = new
ArrayList<>(baseProperties);
- properties.add(RECORD_READER_FACTORY);
- properties.add(ROW_FIELD_NAME);
- properties.add(ROW_FIELD_NAME);
- properties.add(COLUMN_FAMILY);
- properties.add(COLUMN_FAMILY_FIELD);
- properties.add(DELETE_KEY);
- properties.add(FLUSH_ON_FLOWFILE);
- properties.add(FIELD_DELIMITER);
- properties.add(FIELD_DELIMITER_AS_HEX);
- properties.add(MEMORY_SIZE);
- properties.add(RECORD_IN_QUALIFIER);
- properties.add(TIMESTAMP_FIELD);
- properties.add(VISIBILITY_PATH);
- properties.add(DEFAULT_VISIBILITY);
- return properties;
- }
-
-
- @Override
- public void onTrigger(ProcessContext processContext, ProcessSession
processSession) throws ProcessException {
- final FlowFile flowFile = processSession.get();
- if (flowFile == null) {
- return;
- }
-
- final RecordReaderFactory recordParserFactory =
processContext.getProperty(RECORD_READER_FACTORY)
- .asControllerService(RecordReaderFactory.class);
-
- final String recordPathText =
processContext.getProperty(VISIBILITY_PATH).getValue();
- final String defaultVisibility =
processContext.getProperty(DEFAULT_VISIBILITY).isSet() ?
processContext.getProperty(DEFAULT_VISIBILITY).getValue() : null;
-
- final String tableName =
processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-
- accumuloConnectorService.renewTgtIfNecessary();
-
- // create the table if EL is present, create table is true and the
table does not exist.
- if
(processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() &&
processContext.getProperty(CREATE_TABLE).asBoolean()) {
- final TableOperations tableOps = this.client.tableOperations();
- if (!tableOps.exists(tableName)) {
- getLogger().info("Creating " + tableName + " table.");
- try {
- tableOps.create(tableName);
- } catch (TableExistsException te) {
- // can safely ignore, though we shouldn't arrive here due
to table.exists called, but it's possible
- // that with multiple threads two could attempt table
creation concurrently. We don't want that
- // to be a failure.
- } catch (AccumuloSecurityException | AccumuloException e) {
- throw new ProcessException("Accumulo or Security error
creating. Continuing... " + tableName + ". ",e);
- }
- }
- }
-
- AccumuloRecordConfiguration builder =
AccumuloRecordConfiguration.Builder.newBuilder()
- .setTableName(tableName)
-
.setColumnFamily(processContext.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue())
-
.setColumnFamilyField(processContext.getProperty(COLUMN_FAMILY_FIELD).evaluateAttributeExpressions(flowFile).getValue())
-
.setRowField(processContext.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue())
-
.setEncodeFieldDelimiter(processContext.getProperty(FIELD_DELIMITER_AS_HEX).asBoolean())
-
.setFieldDelimiter(processContext.getProperty(FIELD_DELIMITER).isSet() ?
processContext.getProperty(FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue()
: "")
-
.setQualifierInKey(processContext.getProperty(RECORD_IN_QUALIFIER).isSet() ?
processContext.getProperty(RECORD_IN_QUALIFIER).asBoolean() : false)
- .setDelete(processContext.getProperty(DELETE_KEY).isSet() ?
processContext.getProperty(DELETE_KEY).evaluateAttributeExpressions(flowFile).asBoolean()
: false)
-
.setTimestampField(processContext.getProperty(TIMESTAMP_FIELD).evaluateAttributeExpressions(flowFile).getValue()).build();
-
-
- RecordPath recordPath = null;
- if (recordPathCache != null && !StringUtils.isEmpty(recordPathText)) {
- recordPath = recordPathCache.getCompiled(recordPathText);
- }
-
- boolean failed = false;
- Mutation prevMutation=null;
- try (final InputStream in = processSession.read(flowFile);
- final RecordReader reader =
recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
- Record record;
- /**
- * HBase supports a restart point. This may be something that we
can/should add if needed.
- */
- while ((record = reader.nextRecord()) != null) {
- prevMutation = createMutation(prevMutation, processContext,
record, reader.getSchema(), recordPath, flowFile,defaultVisibility, builder);
-
- }
- addMutation(builder.getTableName(),prevMutation);
- } catch (Exception ex) {
- getLogger().error("Failed to put records to Accumulo.", ex);
- failed = true;
- }
-
- if (flushOnEveryFlow){
- try {
- tableWriter.flush();
- } catch (MutationsRejectedException e) {
- throw new ProcessException(e);
- }
- }
-
-
- if (failed) {
- processSession.transfer(processSession.penalize(flowFile),
REL_FAILURE);
- } else {
- processSession.transfer(flowFile, REL_SUCCESS);
- }
- }
-
- /**
- * Adapted from HBASEUtils. Their approach seemed ideal for what our
intent is here.
- * @param columnFamily column family from which to extract the visibility
or to execute an expression against
- * @param columnQualifier column qualifier from which to extract the
visibility or to execute an expression against
- * @param flowFile flow file being written
- * @param context process context
- * @return Visibility
- */
- public static String produceVisibility(String columnFamily, String
columnQualifier, FlowFile flowFile, ProcessContext context) {
- if (org.apache.commons.lang3.StringUtils.isNotEmpty(columnFamily)) {
- return null;
- }
- String lookupKey = String.format("visibility.%s%s%s", columnFamily,
!org.apache.commons.lang3.StringUtils.isNotEmpty(columnQualifier) ? "." : "",
columnQualifier);
- String fromAttribute = flowFile.getAttribute(lookupKey);
-
- if (fromAttribute == null &&
!org.apache.commons.lang3.StringUtils.isBlank(columnQualifier)) {
- String lookupKeyFam = String.format("visibility.%s", columnFamily);
- fromAttribute = flowFile.getAttribute(lookupKeyFam);
- }
-
- if (fromAttribute != null) {
- return fromAttribute;
- } else {
- PropertyValue descriptor = context.getProperty(lookupKey);
- if (descriptor == null || !descriptor.isSet()) {
- descriptor =
context.getProperty(String.format("visibility.%s", columnFamily));
- }
-
- String retVal = descriptor != null ?
descriptor.evaluateAttributeExpressions(flowFile).getValue() : null;
-
- return retVal;
- }
- }
-
- private void addMutation(final String tableName, final Mutation m) throws
AccumuloSecurityException, AccumuloException, TableNotFoundException {
- tableWriter.getBatchWriter(tableName).addMutation(m);
-
- }
-
- /**
- * Returns the row provided the record schema
- * @param record record against which we are evaluating
- * @param schema Record schema
- * @param rowOrFieldName Row identifier or field name
- * @return Text object containing the resulting row.
- */
- private Text getRow(final Record record,
- final RecordSchema schema,
- final String rowOrFieldName){
- if ( !schema.getFieldNames().contains(rowOrFieldName) ){
- return new Text(rowOrFieldName);
- } else{
- return new Text(record.getAsString(rowOrFieldName));
- }
- }
-
- /**
- * Creates a mutation with the provided arguments
- * @param prevMutation previous mutation, to append to if in the same row.
- * @param context process context.
- * @param record record object extracted from the flow file
- * @param schema schema for this record
- * @param recordPath record path for visibility extraction
- * @param flowFile flow file
- * @param defaultVisibility default visibility
- * @param config configuration of this instance.
- * @return Returns the Mutation to insert
- * @throws AccumuloSecurityException Error accessing Accumulo
- * @throws AccumuloException Non security ( or table ) related Accumulo
exceptions writing to the store.
- * @throws TableNotFoundException Table not found on the cluster
- */
- protected Mutation createMutation(final Mutation prevMutation,
- final ProcessContext context,
- final Record record,
- final RecordSchema schema,
- final RecordPath recordPath,
- final FlowFile flowFile,
- final String defaultVisibility,
- AccumuloRecordConfiguration config)
throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
- Mutation m=null;
- if (record != null) {
-
- final Long timestamp;
- Set<String> fieldsToSkip = new HashSet<>();
- if (!StringUtils.isBlank(config.getTimestampField())) {
- try {
- timestamp = record.getAsLong(config.getTimestampField());
- fieldsToSkip.add(config.getTimestampField());
- } catch (Exception e) {
- throw new AccumuloException("Could not convert " +
config.getTimestampField() + " to a long", e);
- }
-
- if (timestamp == null) {
- getLogger().warn("The value of timestamp field " +
config.getTimestampField() + " was null, record will be inserted with latest
timestamp");
- }
- } else {
- timestamp = null;
- }
-
-
-
- RecordField visField = null;
- Map visSettings = null;
- if (recordPath != null) {
- final RecordPathResult result = recordPath.evaluate(record);
- FieldValue fv = result.getSelectedFields().findFirst().get();
- visField = fv.getField();
- if (null != visField)
- fieldsToSkip.add(visField.getFieldName());
- visSettings = (Map)fv.getValue();
- }
-
-
- if (null != prevMutation){
- Text row = new Text(prevMutation.getRow());
- Text curRow = getRow(record,schema,config.getRowField());
- if (row.equals(curRow)){
- m = prevMutation;
- } else{
- m = new Mutation(curRow);
- addMutation(config.getTableName(),prevMutation);
- }
- } else{
- Text row = getRow(record,schema,config.getRowField());
- m = new Mutation(row);
- }
-
- fieldsToSkip.add(config.getRowField());
-
- String columnFamily = config.getColumnFamily();
- if (StringUtils.isBlank(columnFamily) &&
!StringUtils.isBlank(config.getColumnFamilyField())) {
- final String cfField = config.getColumnFamilyField();
- columnFamily = record.getAsString(cfField);
- fieldsToSkip.add(cfField);
- } else if (StringUtils.isBlank(columnFamily) &&
StringUtils.isBlank(config.getColumnFamilyField())){
- throw new IllegalArgumentException("Invalid configuration for
column family " + columnFamily + " and " + config.getColumnFamilyField());
- }
- final Text cf = new Text(columnFamily);
-
- for (String name :
schema.getFieldNames().stream().filter(p->!fieldsToSkip.contains(p)).collect(Collectors.toList()))
{
- String visString = (visField != null && visSettings != null &&
visSettings.containsKey(name))
- ? (String)visSettings.get(name) : defaultVisibility;
-
- Text cq = new Text(name);
- final Value value;
- String recordValue = record.getAsString(name);
- if (config.getQualifierInKey()){
- final String delim = config.getFieldDelimiter();
- if (!StringUtils.isEmpty(delim)) {
- if (config.getEncodeDelimiter()) {
- byte [] asHex = HexFormat.of().parseHex(delim);
- cq.append(asHex, 0, asHex.length);
- }else{
- cq.append(delim.getBytes(), 0, delim.length());
- }
- }
- cq.append(recordValue.getBytes(),0,recordValue.length());
- value = new Value();
- } else{
- value = new Value(recordValue.getBytes());
- }
-
- if (StringUtils.isBlank(visString)) {
- visString = produceVisibility(cf.toString(),
cq.toString(), flowFile, context);
- }
-
- ColumnVisibility cv = new ColumnVisibility();
- if (StringUtils.isBlank(visString)) {
- if (!StringUtils.isBlank(defaultVisibility)) {
- cv = new ColumnVisibility(defaultVisibility);
- }
- } else {
- cv = new ColumnVisibility(visString);
- }
-
- if (null != timestamp) {
- if (config.isDeleteKeys()) {
- m.putDelete(cf, cq, cv, timestamp);
- } else {
- m.put(cf, cq, cv, timestamp, value);
- }
- } else{
- if (config.isDeleteKeys())
- m.putDelete(cf, cq, cv);
- else
- m.put(cf, cq, cv, value);
- }
- }
-
-
-
- }
-
- return m;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
- /**
- * Adapted from HBase puts. This is a good approach and one that we
should adopt here, too.
- */
- if (propertyDescriptorName.startsWith("visibility.")) {
- String[] parts = propertyDescriptorName.split("\\.");
- String displayName;
- String description;
-
- if (parts.length == 2) {
- displayName = String.format("Column Family %s Default
Visibility", parts[1]);
- description = String.format("Default visibility setting for
%s", parts[1]);
- } else if (parts.length == 3) {
- displayName = String.format("Column Qualifier %s.%s Default
Visibility", parts[1], parts[2]);
- description = String.format("Default visibility setting for
%s.%s", parts[1], parts[2]);
- } else {
- return null;
- }
-
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .displayName(displayName)
- .description(description)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .dynamic(true)
- .build();
- }
-
- return null;
- }
-}
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
deleted file mode 100644
index e9ad151be2..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.accumulo.processors;
-
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.io.Text;
-import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
-import org.apache.nifi.accumulo.data.KeySchema;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.configuration.DefaultSchedule;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.StringUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
-
-@SupportsBatching
-@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@CapabilityDescription("Scan the given table and writes result in a flowfile.
Value will be represented as UTF-8 Encoded String.")
-@Tags({"hadoop", "accumulo", "scan", "record"})
-@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
-/**
- * Purpose and Design: Requires a connector be defined by way of an
AccumuloService object. This class
- * simply extends BaseAccumuloProcessor to scan accumulo based on aspects and
expression executed against
- * a flow file
- *
- */
-public class ScanAccumulo extends BaseAccumuloProcessor {
-
- static final PropertyDescriptor START_KEY = new
PropertyDescriptor.Builder()
- .displayName("Start key")
- .name("start-key")
- .description("Start row key")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(Validator.VALID)
- .build();
-
- static final PropertyDescriptor START_KEY_INCLUSIVE = new
PropertyDescriptor.Builder()
- .displayName("Start key Inclusive")
- .name("start-key-inclusive")
- .description("Determines if the start key is inclusive ")
- .required(false)
- .defaultValue("True")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
- static final PropertyDescriptor END_KEY = new PropertyDescriptor.Builder()
- .displayName("End key")
- .name("end-key")
- .description("End row key for this. If not specified or empty this
will be infinite")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(Validator.VALID)
- .build();
-
- static final PropertyDescriptor END_KEY_INCLUSIVE = new
PropertyDescriptor.Builder()
- .displayName("End key Inclusive")
- .name("end-key-inclusive")
- .description("Determines if the end key is inclusive")
- .required(false)
- .defaultValue("False")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
- static final PropertyDescriptor AUTHORIZATIONS = new
PropertyDescriptor.Builder()
- .name("accumulo-authorizations")
- .displayName("Authorizations")
- .description("The comma separated list of authorizations to pass
to the scanner.")
- .required(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(Validator.VALID)
- .build();
-
- static final PropertyDescriptor COLUMNFAMILY = new
PropertyDescriptor.Builder()
- .name("column-family")
- .displayName("Start Column Family")
- .description("The column family that is part of the start key. If
no column key is defined only this column family will be selected")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(Validator.VALID)
- .build();
-
- static final PropertyDescriptor COLUMNFAMILY_END = new
PropertyDescriptor.Builder()
- .name("column-family-end")
- .displayName("End Column Family")
- .description("The column family to select is part of end key")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(Validator.VALID)
- .build();
-
- static final PropertyDescriptor VALUE_INCLUDED_IN_RESULT = new
PropertyDescriptor.Builder()
- .displayName("Value Included in Result")
- .name("accumulo-value-inclusive")
- .description("Beside keys and their values, accumulo value field
will also be included in the result as UTF-8 Encoded String.")
- .required(false)
- .defaultValue("True")
- .allowableValues("True", "False")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("A FlowFile is routed to this relationship after it
has been successfully retrieved from Accumulo")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("A FlowFile is routed to this relationship if it
cannot be retrieved fromAccumulo")
- .build();
-
- static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
- .name("record-writer")
- .displayName("Record Writer")
- .description("Specifies the Controller Service to use for writing
out the records")
- .identifiesControllerService(RecordSetWriterFactory.class)
- .required(true)
- .build();
-
- /**
- * Connector service which provides us a connector if the configuration is
correct.
- */
- protected BaseAccumuloService accumuloConnectorService;
-
- /**
- * Connector that we need to persist while we are operational.
- */
- protected AccumuloClient client;
-
-
- @Override
- public Set<Relationship> getRelationships() {
- final Set<Relationship> rels = new HashSet<>();
- rels.add(REL_SUCCESS);
- rels.add(REL_FAILURE);
- return rels;
- }
-
-
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
- Collection<ValidationResult> set = new ArrayList<>();
- if ((validationContext.getProperty(COLUMNFAMILY).isSet() &&
!validationContext.getProperty(COLUMNFAMILY_END).isSet())
- || !validationContext.getProperty(COLUMNFAMILY).isSet() &&
validationContext.getProperty(COLUMNFAMILY_END).isSet() )
- set.add(new ValidationResult.Builder().explanation("Column Family
and Column family end must be defined").build());
- return set;
- }
-
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- accumuloConnectorService =
context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
- this.client = accumuloConnectorService.getClient();
- }
-
- private Authorizations stringToAuth(final String authorizations){
- if (!StringUtils.isBlank(authorizations))
- return new Authorizations(authorizations.split(","));
- else
- return new Authorizations();
- }
-
-
- protected long scanAccumulo(final RecordSetWriterFactory writerFactory,
final ProcessContext processContext, final ProcessSession processSession, final
Optional<FlowFile> incomingFlowFile){
-
- final Map<String, String> flowAttributes =
incomingFlowFile.isPresent() ? incomingFlowFile.get().getAttributes() : new
HashMap<>();
- final String table =
processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
- final String startKey =
processContext.getProperty(START_KEY).evaluateAttributeExpressions(flowAttributes).getValue();
- final boolean startKeyInclusive =
processContext.getProperty(START_KEY_INCLUSIVE).asBoolean();
- final boolean endKeyInclusive =
processContext.getProperty(END_KEY_INCLUSIVE).asBoolean();
- final String endKey =
processContext.getProperty(END_KEY).evaluateAttributeExpressions(flowAttributes).getValue();
- final String authorizations =
processContext.getProperty(AUTHORIZATIONS).isSet()
- ?
processContext.getProperty(AUTHORIZATIONS).evaluateAttributeExpressions(flowAttributes).getValue()
: "";
- final int threads = processContext.getProperty(THREADS).asInteger();
- final String startKeyCf =
processContext.getProperty(COLUMNFAMILY).evaluateAttributeExpressions(flowAttributes).getValue();
- final String endKeyCf =
processContext.getProperty(COLUMNFAMILY_END).evaluateAttributeExpressions(flowAttributes).getValue();
- final boolean valueIncluded =
processContext.getProperty(VALUE_INCLUDED_IN_RESULT).asBoolean();
-
- final Authorizations auths = stringToAuth(authorizations);
-
- final LongAdder recordCounter = new LongAdder();
-
- final Range lookupRange =
buildRange(startKey,startKeyCf,startKeyInclusive,endKey,endKeyCf,endKeyInclusive);
-
- boolean cloneFlowFile = incomingFlowFile.isPresent();
-
- accumuloConnectorService.renewTgtIfNecessary();
-
- try (BatchScanner scanner =
client.createBatchScanner(table,auths,threads)) {
- if (!StringUtils.isBlank(startKeyCf) &&
StringUtils.isBlank(endKeyCf))
- scanner.fetchColumnFamily(new Text(startKeyCf));
- scanner.setRanges(Collections.singleton(lookupRange));
-
scanner.setTimeout(processContext.getProperty(ACCUMULO_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).longValue(),
TimeUnit.SECONDS);
-
- final Iterator<Map.Entry<Key,Value>> kvIter = scanner.iterator();
- if (!kvIter.hasNext()){
- /**
- * Create a flow file with a record count of zero.
- */
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("record.count", String.valueOf(0));
- final FlowFile newFlow = processSession.create();
- processSession.putAllAttributes(newFlow,attributes);
- processSession.transfer(newFlow, REL_SUCCESS);
- return 0;
- } else{
-
- while (kvIter.hasNext()) {
- FlowFile iterationFlowFile = cloneFlowFile ?
processSession.clone(incomingFlowFile.get()) : processSession.create();
-
- final int keysPerFlowFile = 1000;
- final Map<String, String> attributes = new HashMap<>();
- iterationFlowFile =
processSession.write(iterationFlowFile, new StreamCallback() {
- @Override
- public void process(final InputStream in, final
OutputStream out) throws IOException {
-
- try{
- final RecordSchema writeSchema =
determineRecordSchema(writerFactory, flowAttributes, valueIncluded);
-
- try (final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), writeSchema, out,
Collections.emptyMap())) {
-
- int i = 0;
- writer.beginRecordSet();
- for (; i < keysPerFlowFile &&
kvIter.hasNext(); i++) {
-
- Map.Entry<Key, Value> kv =
kvIter.next();
-
- final Key key = kv.getKey();
-
- Map<String, Object> data = new
HashMap<>();
- data.put("row",
key.getRow().toString());
- data.put("columnFamily",
key.getColumnFamily().toString());
- data.put("columnQualifier",
key.getColumnQualifier().toString());
- data.put("columnVisibility",
key.getColumnVisibility().toString());
- data.put("timestamp",
key.getTimestamp());
- if (valueIncluded) {
- data.put("value",
Objects.isNull(kv.getValue()) ? null : kv.getValue().toString());
- }
-
- Record record = new
MapRecord(writeSchema, data);
- writer.write(record);
-
-
- }
- recordCounter.add(i);
-
- final WriteResult writeResult =
writer.finishRecordSet();
- attributes.put("record.count",
String.valueOf(i));
-
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-
attributes.putAll(writeResult.getAttributes());
- }
- } catch (SchemaNotFoundException e) {
- getLogger().error("Failed to process {}; will
route to failure", new Object[] {
- incomingFlowFile.isPresent() ?
incomingFlowFile.get() : "No incoming flow file", e});
-
- throw new IOException(e);
- }
- }
-
- });
-
processSession.putAllAttributes(iterationFlowFile,attributes);
- processSession.transfer(iterationFlowFile, REL_SUCCESS);
- }
- }
- } catch (final Exception e) {
- getLogger().error("Failed to process {}; will route to failure",
new Object[] {incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No
incoming flow file", e});
- if (cloneFlowFile) {
- processSession.transfer(incomingFlowFile.get(), REL_FAILURE);
- }
- return 0;
- }
-
- if (cloneFlowFile) {
- processSession.remove(incomingFlowFile.get());
- }
-
- getLogger().info("Successfully converted {} records for {}", new
Object[] {recordCounter.longValue(), incomingFlowFile.toString()});
-
- return recordCounter.longValue();
- }
-
- private RecordSchema determineRecordSchema(RecordSetWriterFactory
writerFactory, Map<String, String> flowAttributes, boolean valueIncluded)
throws SchemaNotFoundException, IOException {
- final RecordSchema writeSchema =
writerFactory.getSchema(flowAttributes, new KeySchema());
-
- if (valueIncluded) {
- final List<RecordField> recordSchemaFields = new ArrayList<>();
- recordSchemaFields.addAll(writeSchema.getFields());
- recordSchemaFields.add(new RecordField("value",
RecordFieldType.STRING.getDataType()));
- return new SimpleRecordSchema(recordSchemaFields);
- }
- return writeSchema;
- }
-
-
- Range buildRange(final String startRow, final String startKeyCf,boolean
startKeyInclusive, final String endRow, final String endKeyCf,boolean
endKeyInclusive){
- Key start = StringUtils.isBlank(startRow) ? null :
StringUtils.isBlank(startKeyCf) ? new Key(startRow) : new
Key(startRow,startKeyCf);
- Key end = StringUtils.isBlank(endRow) ? null :
StringUtils.isBlank(endKeyCf) ? new Key(endRow) : new Key(endRow,endKeyCf);
- return new Range(start,startKeyInclusive,end,endKeyInclusive);
- }
-
- @Override
- public void onTrigger(ProcessContext processContext, ProcessSession
processSession) throws ProcessException {
- FlowFile flowFile = processSession.get();
-
- final RecordSetWriterFactory writerFactory =
processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
- long recordCount =
scanAccumulo(writerFactory,processContext,processSession,Optional.ofNullable(flowFile));
-
- processSession.adjustCounter("Records Processed", recordCount, false);
- }
-
-
- @Override
- public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> properties = new
ArrayList<>(baseProperties);
- properties.add(START_KEY);
- properties.add(START_KEY_INCLUSIVE);
- properties.add(END_KEY);
- properties.add(COLUMNFAMILY);
- properties.add(COLUMNFAMILY_END);
- properties.add(END_KEY_INCLUSIVE);
- properties.add(VALUE_INCLUDED_IN_RESULT);
- properties.add(RECORD_WRITER);
- properties.add(AUTHORIZATIONS);
- return properties;
- }
-
-}
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index a1ce07210c..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.accumulo.processors.PutAccumuloRecord
-org.apache.nifi.accumulo.processors.ScanAccumulo
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
deleted file mode 100644
index 75a88a56dc..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
+++ /dev/null
@@ -1,40 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-bundle</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>nifi-accumulo-services-api-nar</artifactId>
- <packaging>nar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-nar</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services-api</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
deleted file mode 100644
index 6491a3e0fb..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-bundle</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-accumulo-services-api</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- </dependency>
-
- </dependencies>
-</project>
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
deleted file mode 100644
index 3266ad54dd..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.accumulo.controllerservices;
-
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.controller.ControllerService;
-
-@Tags({"accumulo", "client", "service"})
-@CapabilityDescription("Provides a basic connector to Accumulo services")
-public interface BaseAccumuloService extends ControllerService {
-
-
- AccumuloClient getClient();
- void renewTgtIfNecessary();
-
-}
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
deleted file mode 100644
index fe6c251371..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-bundle</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-accumulo-services-nar</artifactId>
- <packaging>nar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services-api-nar</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
deleted file mode 100644
index 60056782ac..0000000000
--- a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
+++ /dev/null
@@ -1,70 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-bundle</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-accumulo-services</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-services-api</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-kerberos-credentials-service-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-kerberos-user-service-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-security-kerberos</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-security-kerberos-api</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-hadoop-utils</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
-</project>
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
deleted file mode 100644
index 11aaf0120c..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.accumulo.controllerservices;
-
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.client.security.tokens.KerberosToken;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.hadoop.SecurityUtil;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.kerberos.KerberosUserService;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.security.krb.KerberosKeytabUser;
-import org.apache.nifi.security.krb.KerberosPasswordUser;
-import org.apache.nifi.security.krb.KerberosUser;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Purpose: Controller service that provides us a configured connector. Note
that we don't need to close this
- *
- * Justification: Centralizes the configuration of the connecting accumulo
code. This also will be used
- * for any kerberos integration.
- */
-@RequiresInstanceClassLoading
-@Tags({"accumulo", "client", "service"})
-@CapabilityDescription("A controller service for accessing an Accumulo
Client.")
-public class AccumuloService extends AbstractControllerService implements
BaseAccumuloService {
-
- private enum AuthenticationType {
- PASSWORD,
- KERBEROS,
- NONE
- }
-
- protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new
PropertyDescriptor.Builder()
- .name("ZooKeeper Quorum")
- .displayName("ZooKeeper Quorum")
- .description("Comma-separated list of ZooKeeper hosts for
Accumulo.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- protected static final PropertyDescriptor INSTANCE_NAME = new
PropertyDescriptor.Builder()
- .name("Instance Name")
- .displayName("Instance Name")
- .description("Instance name of the Accumulo cluster")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- protected static final PropertyDescriptor AUTHENTICATION_TYPE = new
PropertyDescriptor.Builder()
- .name("accumulo-authentication-type")
- .displayName("Authentication Type")
- .description("Authentication Type")
- .allowableValues(AuthenticationType.values())
- .defaultValue(AuthenticationType.PASSWORD.toString())
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- protected static final PropertyDescriptor ACCUMULO_USER = new
PropertyDescriptor.Builder()
- .name("Accumulo User")
- .displayName("Accumulo User")
- .description("Connecting user for Accumulo")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .dependsOn(AUTHENTICATION_TYPE,
AuthenticationType.PASSWORD.toString())
- .build();
-
- protected static final PropertyDescriptor ACCUMULO_PASSWORD = new
PropertyDescriptor.Builder()
- .name("Accumulo Password")
- .displayName("Accumulo Password")
- .description("Connecting user's password")
- .sensitive(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .dependsOn(AUTHENTICATION_TYPE,
AuthenticationType.PASSWORD.toString())
- .build();
-
- protected static final PropertyDescriptor KERBEROS_USER_SERVICE = new
PropertyDescriptor.Builder()
- .name("kerberos-user-service")
- .displayName("Kerberos User Service")
- .description("Specifies the Kerberos User Controller Service that
should be used for authenticating with Kerberos")
- .identifiesControllerService(KerberosUserService.class)
- .required(false)
- .build();
-
- protected static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE =
new PropertyDescriptor.Builder()
- .name("kerberos-credentials-service")
- .displayName("Kerberos Credentials Service")
- .description("Specifies the Kerberos Credentials Controller
Service that should be used for principal + keytab Kerberos authentication")
- .identifiesControllerService(KerberosCredentialsService.class)
- .dependsOn(AUTHENTICATION_TYPE,
AuthenticationType.KERBEROS.toString())
- .build();
-
- protected static final PropertyDescriptor KERBEROS_PRINCIPAL = new
PropertyDescriptor.Builder()
- .name("kerberos-principal")
- .displayName("Kerberos Principal")
- .description("Kerberos Principal")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .dependsOn(AUTHENTICATION_TYPE,
AuthenticationType.KERBEROS.toString())
- .build();
-
- protected static final PropertyDescriptor KERBEROS_PASSWORD = new
PropertyDescriptor.Builder()
- .name("kerberos-password")
- .displayName("Kerberos Password")
- .description("Kerberos Password")
- .sensitive(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .dependsOn(AUTHENTICATION_TYPE,
AuthenticationType.KERBEROS.toString())
- .build();
-
- protected static final PropertyDescriptor ACCUMULO_SASL_QOP = new
PropertyDescriptor.Builder()
- .name("accumulo-sasl-qop")
- .displayName("Accumulo SASL quality of protection")
- .description("Accumulo SASL quality of protection for KERBEROS
Authentication type")
- .allowableValues("auth", "auth-int", "auth-conf")
- .defaultValue("auth-conf")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .dependsOn(AUTHENTICATION_TYPE,
AuthenticationType.KERBEROS.toString())
- .build();
-
- /**
- * Reference to the accumulo client.
- */
- AccumuloClient client;
-
- /**
- * properties
- */
- private List<PropertyDescriptor> properties;
-
- private KerberosUser kerberosUser;
-
- private AuthenticationType authType;
-
- @Override
- protected void init(ControllerServiceInitializationContext config) {
- List<PropertyDescriptor> props = new ArrayList<>();
- props.add(ZOOKEEPER_QUORUM);
- props.add(INSTANCE_NAME);
- props.add(AUTHENTICATION_TYPE);
- props.add(ACCUMULO_USER);
- props.add(ACCUMULO_PASSWORD);
- props.add(KERBEROS_USER_SERVICE);
- props.add(KERBEROS_CREDENTIALS_SERVICE);
- props.add(KERBEROS_PRINCIPAL);
- props.add(KERBEROS_PASSWORD);
- props.add(ACCUMULO_SASL_QOP);
- properties = Collections.unmodifiableList(props);
- }
-
- @Override
- public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
- final List<ValidationResult> problems = new ArrayList<>();
-
- if (!validationContext.getProperty(INSTANCE_NAME).isSet()){
- problems.add(new
ValidationResult.Builder().valid(false).subject(INSTANCE_NAME.getName()).explanation("Instance
name must be supplied").build());
- }
-
- if (!validationContext.getProperty(ZOOKEEPER_QUORUM).isSet()){
- problems.add(new
ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers
must be supplied").build());
- }
-
- final AuthenticationType type = validationContext.getProperty(
- AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf(
validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) :
AuthenticationType.NONE;
-
- switch(type){
- case PASSWORD:
- if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
- problems.add(
- new
ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo
user must be supplied for the Password Authentication type").build());
- }
- if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
- problems.add(
- new
ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName())
- .explanation("Password must be supplied
for the Password Authentication type").build());
- }
- break;
- case KERBEROS:
- if
(!validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
!validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()
- &&
!validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
- problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
- .explanation("Either Kerberos Password, Kerberos
Credential Service, or Kerberos User Service must be set").build());
- } else if
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
- problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
- .explanation("Kerberos Password and Kerberos
Credential Service should not be filled out at the same time").build());
- } else if
(validationContext.getProperty(KERBEROS_PASSWORD).isSet() &&
!validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()) {
- problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
- .explanation("Kerberos Principal must be supplied
when principal + password Kerberos authentication is used").build());
- } else if
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
- problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
- .explanation("Kerberos Principal (for password)
should not be filled out when principal + keytab Kerberos authentication is
used").build());
- } else if
(validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_USER_SERVICE).isSet()) {
- problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_USER_SERVICE.getName())
- .explanation("Kerberos User Service cannot be
specified while also specifying a Kerberos Credential Service").build());
- } else if
(validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PASSWORD).isSet()){
- problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName())
- .explanation("Kerberos Password and Kerberos User
Service should not be filled out at the same time").build());
- } else if
(validationContext.getProperty(KERBEROS_USER_SERVICE).isSet() &&
validationContext.getProperty(KERBEROS_PRINCIPAL).isSet()){
- problems.add(new
ValidationResult.Builder().valid(false).subject(KERBEROS_PRINCIPAL.getName())
- .explanation("Kerberos Principal (for password)
should not be filled out when Kerberos User Service is used").build());
- }
- break;
- default:
- problems.add(new
ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Non
supported Authentication type").build());
- }
-
- return problems;
- }
-
- @OnEnabled
- public void onEnabled(final ConfigurationContext context) throws
InitializationException, IOException, InterruptedException {
- if (!context.getProperty(INSTANCE_NAME).isSet() ||
!context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
- throw new InitializationException("Instance name and Zookeeper
Quorum must be specified");
- }
-
- final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
- final KerberosCredentialsService kerberosCredentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
- final String instanceName =
context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
- final String zookeepers =
context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
- this.authType = AuthenticationType.valueOf(
context.getProperty(AUTHENTICATION_TYPE).getValue());
-
- final Properties clientConf = new Properties();
- clientConf.setProperty("instance.zookeepers", zookeepers);
- clientConf.setProperty("instance.name", instanceName);
-
- switch(authType){
- case PASSWORD:
- final String accumuloUser =
context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
-
- final AuthenticationToken token = new
PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
-
- this.client =
Accumulo.newClient().from(clientConf).as(accumuloUser, token).build();
- break;
- case KERBEROS:
- if (kerberosUserService != null) {
- this.kerberosUser =
kerberosUserService.createKerberosUser();
- } else if (kerberosCredentialsService != null) {
- this.kerberosUser = new
KerberosKeytabUser(kerberosCredentialsService.getPrincipal(),
kerberosCredentialsService.getKeytab());
- } else {
- this.kerberosUser = new
KerberosPasswordUser(context.getProperty(KERBEROS_PRINCIPAL).getValue(),
context.getProperty(KERBEROS_PASSWORD).getValue());
- }
-
- clientConf.setProperty("sasl.enabled", "true");
- clientConf.setProperty("sasl.qop",
context.getProperty(ACCUMULO_SASL_QOP).getValue());
-
- //Client uses the currently logged in user's security context,
so need to login first.
- Configuration conf = new Configuration();
- conf.set("hadoop.security.authentication", "kerberos");
- UserGroupInformation.setConfiguration(conf);
- final UserGroupInformation clientUgi =
SecurityUtil.getUgiForKerberosUser(conf, kerberosUser);
-
- this.client =
clientUgi.doAs((PrivilegedExceptionAction<AccumuloClient>) () ->
-
Accumulo.newClient().from(clientConf).as(kerberosUser.getPrincipal(), new
KerberosToken()).build());
- break;
- default:
- throw new InitializationException("Not supported
authentication type.");
- }
- }
-
- @Override
- public AccumuloClient getClient() {
- return client;
- }
-
- @Override
- public void renewTgtIfNecessary() {
- if (authType.equals(AuthenticationType.KERBEROS)) {
- SecurityUtil.checkTGTAndRelogin(getLogger(), kerberosUser);
- }
- }
-
- @OnDisabled
- public void shutdown() {
- if (client != null) {
- client.close();
- }
- }
-}
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
deleted file mode 100644
index 0e27be47a5..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ /dev/null
@@ -1,15 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.accumulo.controllerservices.AccumuloService
diff --git
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
b/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
deleted file mode 100644
index 8424da13a0..0000000000
---
a/nifi-extension-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/test/java/org/apache/nifi/accumulo/controllerservices/TestAccumuloService.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
-
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.accumulo.controllerservices;
-
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.kerberos.KerberosUserService;
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestAccumuloService {
-
- private static final String INSTANCE = "instance";
- private static final String ZOOKEEPER = "zookeeper";
- private static final String PASSWORD = "PASSWORD";
- private static final String USER = "USER";
- private static final String KERBEROS = "KERBEROS";
- private static final String PRINCIPAL = "principal";
- private static final String KERBEROS_PASSWORD = "kerberos_password";
- private static final String NONE = "NONE";
-
- private TestRunner runner;
- private AccumuloService accumuloService;
-
- private final KerberosCredentialsService credentialService =
mock(KerberosCredentialsService.class);
- private final KerberosUserService kerberosUserService =
mock(KerberosUserService.class);
- private final Processor dummyProcessor = mock(Processor.class);
-
- @BeforeEach
- public void init() {
- runner = TestRunners.newTestRunner(dummyProcessor);
- accumuloService = new AccumuloService();
-
- when(credentialService.getIdentifier()).thenReturn("1");
-
when(kerberosUserService.getIdentifier()).thenReturn("kerberosUserService1");
- }
-
- @Test
- public void
testServiceValidWithAuthTypePasswordAndInstanceZookeeperUserPasswordAreSet()
throws InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
- runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER,
USER);
- runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD,
PASSWORD);
-
- runner.assertValid(accumuloService);
- }
-
- @Test
- public void testServiceNotValidWithInstanceMissing() throws
InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
-
- assertServiceIsInvalidWithErrorMessage("Instance name must be
supplied");
- }
-
- @Test
- public void testServiceNotValidWithZookeeperMissing() throws
InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
-
- assertServiceIsInvalidWithErrorMessage("Zookeepers must be supplied");
- }
-
- @Test
- public void testServiceNotValidWithAuthTypeNone() throws
InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, NONE);
-
- assertServiceIsInvalidWithErrorMessage("Non supported Authentication
type");
- }
-
- @Test
- public void testServiceNotValidWithAuthTypePasswordAndUserMissing() throws
InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
- runner.setProperty(accumuloService, AccumuloService.ACCUMULO_PASSWORD,
PASSWORD);
-
- assertServiceIsInvalidWithErrorMessage("Accumulo user must be
supplied");
- }
-
- @Test
- public void testServiceNotValidWithAuthTypePasswordAndPasswordMissing()
throws InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, PASSWORD);
- runner.setProperty(accumuloService, AccumuloService.ACCUMULO_USER,
USER);
-
- assertServiceIsInvalidWithErrorMessage("Password must be supplied");
- }
-
- @Test
- public void
testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceMissing()
throws InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
-
- assertServiceIsInvalidWithErrorMessage("Either Kerberos Password,
Kerberos Credential Service, or Kerberos User Service must be set");
- }
-
- @Test
- public void
testServiceNotValidWithAuthTypeKerberosAndKerberosPrincipalMissing() throws
InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
- runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD,
KERBEROS_PASSWORD);
-
- assertServiceIsInvalidWithErrorMessage("Kerberos Principal must be
supplied");
- }
-
- @Test
- public void
testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndCredentialServiceSet()
throws InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
- runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD,
KERBEROS_PASSWORD);
- runner.addControllerService("kerberos-credentials-service",
credentialService);
- runner.setProperty(accumuloService,
AccumuloService.KERBEROS_CREDENTIALS_SERVICE,
credentialService.getIdentifier());
-
- assertServiceIsInvalidWithErrorMessage("should not be filled out at
the same time");
- }
-
- @Test
- public void
testServiceNotValidWithAuthTypeKerberosAndPrincipalAndCredentialServiceSet()
throws InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
- runner.setProperty(accumuloService,
AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
- runner.addControllerService("kerberos-credentials-service",
credentialService);
- runner.setProperty(accumuloService,
AccumuloService.KERBEROS_CREDENTIALS_SERVICE,
credentialService.getIdentifier());
-
- assertServiceIsInvalidWithErrorMessage("Kerberos Principal (for
password) should not be filled out");
- }
-
- @Test
- public void
testServiceNotValidWithAuthTypeKerberosAndKerberosPasswordAndUserServiceSet()
throws InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
- runner.setProperty(accumuloService,
AccumuloService.KERBEROS_PRINCIPAL, PRINCIPAL);
- runner.setProperty(accumuloService, AccumuloService.KERBEROS_PASSWORD,
KERBEROS_PASSWORD);
- runner.addControllerService("kerberos-user-service",
kerberosUserService);
- runner.setProperty(accumuloService,
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
-
- assertServiceIsInvalidWithErrorMessage("should not be filled out at
the same time");
- }
-
- @Test
- public void
testServiceNotValidWithAuthTypeKerberosAndCredentialServiceAndUserServiceSet()
throws InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
-
- runner.addControllerService("kerberos-credentials-service",
credentialService);
- runner.setProperty(accumuloService,
AccumuloService.KERBEROS_CREDENTIALS_SERVICE,
credentialService.getIdentifier());
-
- runner.addControllerService("kerberos-user-service",
kerberosUserService);
- runner.setProperty(accumuloService,
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
-
- assertServiceIsInvalidWithErrorMessage("Kerberos User Service cannot
be specified while also specifying a Kerberos Credential Service");
- }
-
- @Test
- public void
testServiceIsValidWithAuthTypeKerberosAndKerberosUserServiceSet() throws
InitializationException {
- runner.addControllerService("accumulo-connector-service",
accumuloService);
- runner.setProperty(accumuloService, AccumuloService.INSTANCE_NAME,
INSTANCE);
- runner.setProperty(accumuloService, AccumuloService.ZOOKEEPER_QUORUM,
ZOOKEEPER);
- runner.setProperty(accumuloService,
AccumuloService.AUTHENTICATION_TYPE, KERBEROS);
- runner.addControllerService("kerberos-user-service",
kerberosUserService);
- runner.enableControllerService(kerberosUserService);
- runner.setProperty(accumuloService,
AccumuloService.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
- runner.assertValid(accumuloService);
- }
-
- private void assertServiceIsInvalidWithErrorMessage(String errorMessage) {
- Exception exception = assertThrows(IllegalStateException.class, () ->
runner.enableControllerService(accumuloService));
- assertTrue(exception.getMessage().contains(errorMessage));
- }
-}
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-accumulo-bundle/pom.xml
b/nifi-extension-bundles/nifi-accumulo-bundle/pom.xml
deleted file mode 100644
index 5c64d96a56..0000000000
--- a/nifi-extension-bundles/nifi-accumulo-bundle/pom.xml
+++ /dev/null
@@ -1,85 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor
- license agreements. See the NOTICE file distributed with this work for
additional
- information regarding copyright ownership. The ASF licenses this file to
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services-api-bom</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../nifi-standard-services-api-bom</relativePath>
- </parent>
-
- <properties>
- <accumulo.version>2.1.2</accumulo.version>
- <guava.version>33.2.0-jre</guava.version>
- </properties>
-
- <artifactId>nifi-accumulo-bundle</artifactId>
- <packaging>pom</packaging>
-
- <modules>
- <module>nifi-accumulo-services-api</module>
- <module>nifi-accumulo-services-api-nar</module>
- <module>nifi-accumulo-services</module>
- <module>nifi-accumulo-services-nar</module>
- <module>nifi-accumulo-processors</module>
- <module>nifi-accumulo-nar</module>
- </modules>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-accumulo-processors</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <!-- Override nimbus-jose-jwt 9.8.1 from hadoop-auth -->
- <dependency>
- <groupId>com.nimbusds</groupId>
- <artifactId>nimbus-jose-jwt</artifactId>
- <version>9.37.3</version>
- </dependency>
- <!-- Override Hadoop from accumulo-core -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client-api</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client-runtime</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- <version>${accumulo.version}</version>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- Override Guava 31.1 -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-</project>
diff --git a/nifi-extension-bundles/pom.xml b/nifi-extension-bundles/pom.xml
index c999eec3ac..fa85ffa68c 100755
--- a/nifi-extension-bundles/pom.xml
+++ b/nifi-extension-bundles/pom.xml
@@ -78,7 +78,6 @@
<module>nifi-prometheus-bundle</module>
<module>nifi-sql-reporting-bundle</module>
<module>nifi-hazelcast-bundle</module>
- <module>nifi-accumulo-bundle</module>
<module>nifi-asn1-bundle</module>
<module>nifi-pgp-bundle</module>
<module>nifi-hashicorp-vault-bundle</module>