This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 26b2036 NIFI-6552 - Kudu Put Operations
26b2036 is described below
commit 26b203616e79032bdcd47ac927d488a2ce68c656
Author: SandishKumarHN <[email protected]>
AuthorDate: Sun Jul 28 18:15:07 2019 -0700
NIFI-6552 - Kudu Put Operations
Signed-off-by: Pierre Villard <[email protected]>
This closes #3610.
---
.../nifi-kudu-bundle/nifi-kudu-processors/pom.xml | 2 +-
.../processors/kudu/AbstractKuduProcessor.java | 263 +++++++++++++++++++++
.../apache/nifi/processors/kudu/OperationType.java | 4 +-
.../org/apache/nifi/processors/kudu/PutKudu.java | 219 ++++-------------
.../apache/nifi/processors/kudu/MockPutKudu.java | 41 +++-
.../apache/nifi/processors/kudu/TestPutKudu.java | 69 ++++--
6 files changed, 401 insertions(+), 197 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 92664ff..e54c531 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -46,7 +46,7 @@
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
- <version>1.7.0</version>
+ <version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
new file mode 100644
index 0000000..c1b1a81
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.SessionConfiguration;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.Update;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.serialization.record.Record;
+
+import javax.security.auth.login.LoginException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+
+public abstract class AbstractKuduProcessor extends AbstractProcessor {
+
+ static final PropertyDescriptor KUDU_MASTERS = new Builder()
+ .name("Kudu Masters")
+ .description("Comma separated addresses of the Kudu masters to
connect to.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
Builder()
+ .name("kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials to use for
authentication")
+ .required(false)
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .build();
+
+ static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder()
+ .name("kudu-operations-timeout-ms")
+ .displayName("Kudu Operation Timeout")
+ .description("Default timeout used for user operations (using
sessions and scanners)")
+ .required(false)
+
.defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) +
"ms")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS = new
Builder()
+ .name("kudu-keep-alive-period-timeout-ms")
+ .displayName("Kudu Keep Alive Period Timeout")
+ .description("Default timeout used for user operations")
+ .required(false)
+
.defaultValue(String.valueOf(AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS) +
"ms")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected KuduClient kuduClient;
+
+ private volatile KerberosUser kerberosUser;
+
+ public KerberosUser getKerberosUser() {
+ return this.kerberosUser;
+ }
+
+ public KuduClient getKuduClient() {
+ return this.kuduClient;
+ }
+
+ public void createKuduClient(ProcessContext context) throws LoginException
{
+ final String kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+ final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ if (credentialsService == null) {
+ return;
+ }
+
+ final String keytab = credentialsService.getKeytab();
+ final String principal = credentialsService.getPrincipal();
+ kerberosUser = loginKerberosUser(principal, keytab);
+
+ final KerberosAction<KuduClient> kerberosAction = new
KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context),
getLogger());
+ this.kuduClient = kerberosAction.execute();
+ }
+
+
+ protected KuduClient buildClient(final String masters, final
ProcessContext context) {
+ final Integer operationTimeout =
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final Integer adminOperationTimeout =
context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+ return new KuduClient.KuduClientBuilder(masters)
+ .defaultOperationTimeoutMs(operationTimeout)
+ .defaultSocketReadTimeoutMs(adminOperationTimeout)
+ .build();
+ }
+
+ protected void flushKuduSession(final KuduSession kuduSession, boolean
close, final List<RowError> rowErrors) throws KuduException {
+ final List<OperationResponse> responses = close ? kuduSession.close()
: kuduSession.flush();
+
+ if (kuduSession.getFlushMode() ==
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
+ } else {
+ responses.stream()
+ .filter(OperationResponse::hasRowError)
+ .map(OperationResponse::getRowError)
+ .forEach(rowErrors::add);
+ }
+ }
+
+ protected KerberosUser loginKerberosUser(final String principal, final
String keytab) throws LoginException {
+ final KerberosUser kerberosUser = new KerberosKeytabUser(principal,
keytab);
+ kerberosUser.login();
+ return kerberosUser;
+ }
+
+ @OnStopped
+ public void shutdown() throws Exception {
+ try {
+ if (this.kuduClient != null) {
+ getLogger().debug("Closing KuduClient");
+ this.kuduClient.close();
+ this.kuduClient = null;
+ }
+ } finally {
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ kerberosUser = null;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ protected void buildPartialRow(Schema schema, PartialRow row, Record
record, List<String> fieldNames, Boolean ignoreNull) {
+
+ for (String colName : fieldNames) {
+ int colIdx = this.getColumnIndex(schema, colName);
+ if (colIdx != -1) {
+ ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
+ Type colType = colSchema.getType();
+
+ if (record.getValue(colName) == null) {
+ if (schema.getColumnByIndex(colIdx).isKey()) {
+ throw new
IllegalArgumentException(String.format("Can't set primary key column %s to null
", colName));
+ } else if(!schema.getColumnByIndex(colIdx).isNullable()) {
+ throw new
IllegalArgumentException(String.format("Can't set primary key column %s to null
", colName));
+ }
+
+ if (!ignoreNull) {
+ row.setNull(colName);
+ continue;
+ }
+ } else {
+ switch
(colType.getDataType(colSchema.getTypeAttributes())) {
+ case BOOL:
+ row.addBoolean(colIdx,
record.getAsBoolean(colName));
+ break;
+ case FLOAT:
+ row.addFloat(colIdx, record.getAsFloat(colName));
+ break;
+ case DOUBLE:
+ row.addDouble(colIdx, record.getAsDouble(colName));
+ break;
+ case BINARY:
+ row.addBinary(colIdx,
record.getAsString(colName).getBytes());
+ break;
+ case INT8:
+ row.addByte(colIdx,
record.getAsInt(colName).byteValue());
+ break;
+ case INT16:
+ row.addShort(colIdx,
record.getAsInt(colName).shortValue());
+ break;
+ case INT32:
+ row.addInt(colIdx, record.getAsInt(colName));
+ break;
+ case INT64:
+ case UNIXTIME_MICROS:
+ row.addLong(colIdx, record.getAsLong(colName));
+ break;
+ case STRING:
+ row.addString(colIdx, record.getAsString(colName));
+ break;
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ row.addDecimal(colIdx, new
BigDecimal(record.getAsString(colName)));
+ break;
+ default:
+ throw new
IllegalStateException(String.format("unknown column type %s", colType));
+ }
+ }
+ }
+ }
+ }
+
+ private int getColumnIndex(Schema columns, String colName) {
+ try {
+ return columns.getColumnIndex(colName);
+ } catch (Exception ex) {
+ return -1;
+ }
+ }
+
+ protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames, Boolean ignoreNull) {
+ Upsert upsert = kuduTable.newUpsert();
+ buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record,
fieldNames, ignoreNull);
+ return upsert;
+ }
+
+ protected Insert insertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames, Boolean ignoreNull) {
+ Insert insert = kuduTable.newInsert();
+ buildPartialRow(kuduTable.getSchema(), insert.getRow(), record,
fieldNames, ignoreNull);
+ return insert;
+ }
+
+ protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record,
List<String> fieldNames, Boolean ignoreNull) {
+ Delete delete = kuduTable.newDelete();
+ buildPartialRow(kuduTable.getSchema(), delete.getRow(), record,
fieldNames, ignoreNull);
+ return delete;
+ }
+
+ protected Update updateRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames, Boolean ignoreNull) {
+ Update update = kuduTable.newUpdate();
+ buildPartialRow(kuduTable.getSchema(), update.getRow(), record,
fieldNames, ignoreNull);
+ return update;
+ }
+
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
index 4ab466e..08bcd77 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java
@@ -20,5 +20,7 @@ package org.apache.nifi.processors.kudu;
public enum OperationType {
INSERT,
INSERT_IGNORE,
- UPSERT;
+ UPSERT,
+ UPDATE,
+ DELETE;
}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 900d3d3..1b47337 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -17,20 +17,13 @@
package org.apache.nifi.processors.kudu;
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
-import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
@@ -39,20 +32,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
-import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -62,10 +51,8 @@ import org.apache.nifi.serialization.record.RecordSet;
import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.io.InputStream;
-import java.math.BigDecimal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -82,14 +69,8 @@ import java.util.stream.Collectors;
"to the specified Kudu's table. The schema for the table must be
provided in the processor properties or from your source." +
" If any error occurs while reading records from the input, or writing
records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute = "record.count", description = "Number of records
written to Kudu")
-public class PutKudu extends AbstractProcessor {
- protected static final PropertyDescriptor KUDU_MASTERS = new Builder()
- .name("Kudu Masters")
- .description("List all kudu masters's ip with port (e.g. 7051), comma
separated")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
+
+public class PutKudu extends AbstractKuduProcessor {
protected static final PropertyDescriptor TABLE_NAME = new Builder()
.name("Table Name")
@@ -99,20 +80,13 @@ public class PutKudu extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
Builder()
- .name("kerberos-credentials-service")
- .displayName("Kerberos Credentials Service")
- .description("Specifies the Kerberos Credentials to use for
authentication")
- .required(false)
- .identifiesControllerService(KerberosCredentialsService.class)
- .build();
-
public static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The service for reading records from incoming flow
files.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
@@ -127,9 +101,10 @@ public class PutKudu extends AbstractProcessor {
protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
+ .displayName("Kudu Operation Type")
.description("Specify operationType for this processor. Insert-Ignore
will ignore duplicated rows")
- .allowableValues(OperationType.values())
.defaultValue(OperationType.INSERT.toString())
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -142,10 +117,11 @@ public class PutKudu extends AbstractProcessor {
"MANUAL_FLUSH: the call returns when the operation has been added
to the buffer, else it throws a KuduException if the buffer is full.")
.allowableValues(SessionConfiguration.FlushMode.values())
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
- protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new
Builder()
.name("FlowFiles per Batch")
.description("The maximum number of FlowFiles to process in a single
execution, between 1 - 100000. " +
"Depending on your memory size, and data size per row set an
appropriate batch size " +
@@ -157,7 +133,7 @@ public class PutKudu extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- protected static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ protected static final PropertyDescriptor BATCH_SIZE = new Builder()
.name("Batch Size")
.displayName("Max Records per Batch")
.description("The maximum number of Records to process in a single
Kudu-client batch, between 1 - 100000. " +
@@ -169,6 +145,15 @@ public class PutKudu extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ protected static final PropertyDescriptor IGNORE_NULL = new Builder()
+ .name("Ignore NULL")
+ .description("Ignore NULL on Kudu Put Operation, Update only non-Null
columns if set true")
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
protected static final Relationship REL_SUCCESS = new
Relationship.Builder()
.name("success")
@@ -183,12 +168,10 @@ public class PutKudu extends AbstractProcessor {
protected OperationType operationType;
protected SessionConfiguration.FlushMode flushMode;
+
protected int batchSize = 100;
protected int ffbatch = 1;
- protected KuduClient kuduClient;
- private volatile KerberosUser kerberosUser;
-
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -201,7 +184,9 @@ public class PutKudu extends AbstractProcessor {
properties.add(FLUSH_MODE);
properties.add(FLOWFILE_BATCH_SIZE);
properties.add(BATCH_SIZE);
-
+ properties.add(IGNORE_NULL);
+ properties.add(KUDU_OPERATION_TIMEOUT_MS);
+ properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
return properties;
}
@@ -213,58 +198,15 @@ public class PutKudu extends AbstractProcessor {
return rels;
}
+ protected KerberosUser kerberosUser;
+ protected KuduSession kuduSession;
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException,
LoginException {
- final String kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
- operationType =
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch =
context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode =
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
-
- getLogger().debug("Setting up Kudu connection...");
- final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
- kuduClient = createClient(kuduMasters, credentialsService);
- getLogger().debug("Kudu connection successfully initialized");
- }
-
- protected KuduClient createClient(final String masters, final
KerberosCredentialsService credentialsService) throws LoginException {
- if (credentialsService == null) {
- return buildClient(masters);
- }
-
- final String keytab = credentialsService.getKeytab();
- final String principal = credentialsService.getPrincipal();
- kerberosUser = loginKerberosUser(principal, keytab);
-
- final KerberosAction<KuduClient> kerberosAction = new
KerberosAction<>(kerberosUser, () -> buildClient(masters), getLogger());
- return kerberosAction.execute();
- }
-
- protected KuduClient buildClient(final String masters) {
- return new KuduClient.KuduClientBuilder(masters).build();
- }
-
- protected KerberosUser loginKerberosUser(final String principal, final
String keytab) throws LoginException {
- final KerberosUser kerberosUser = new KerberosKeytabUser(principal,
keytab);
- kerberosUser.login();
- return kerberosUser;
- }
-
- @OnStopped
- public final void closeClient() throws KuduException, LoginException {
- try {
- if (kuduClient != null) {
- getLogger().debug("Closing KuduClient");
- kuduClient.close();
- kuduClient = null;
- }
- } finally {
- if (kerberosUser != null) {
- kerberosUser.logout();
- kerberosUser = null;
- }
- }
+ createKuduClient(context);
}
@Override
@@ -273,6 +215,7 @@ public class PutKudu extends AbstractProcessor {
if (flowFiles.isEmpty()) {
return;
}
+ kerberosUser = getKerberosUser();
final KerberosUser user = kerberosUser;
if (user == null) {
@@ -290,9 +233,11 @@ public class PutKudu extends AbstractProcessor {
}
private void trigger(final ProcessContext context, final ProcessSession
session, final List<FlowFile> flowFiles) throws ProcessException {
- final KuduSession kuduSession = getKuduSession(kuduClient);
final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final KuduClient kuduClient = getKuduClient();
+ kuduSession = getKuduSession(kuduClient);
+
final Map<FlowFile, Integer> numRecords = new HashMap<>();
final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
@@ -300,6 +245,8 @@ public class PutKudu extends AbstractProcessor {
int numBuffered = 0;
final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
+ operationType =
OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
+ Boolean ignoreNull =
Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final List<String> fieldNames =
recordReader.getSchema().getFieldNames();
@@ -309,10 +256,7 @@ public class PutKudu extends AbstractProcessor {
Record record = recordSet.next();
while (record != null) {
- Operation operation = operationType == OperationType.UPSERT
- ? upsertRecordToKudu(kuduTable, record, fieldNames)
- : insertRecordToKudu(kuduTable, record, fieldNames);
-
+ Operation operation = getKuduOperationType(operationType,
record, fieldNames, ignoreNull, kuduTable);
// We keep track of mappings between Operations and their
origins,
// so that we know which FlowFiles should be marked
failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
@@ -399,91 +343,20 @@ public class PutKudu extends AbstractProcessor {
return kuduSession;
}
- private void flushKuduSession(final KuduSession kuduSession, boolean
close, final List<RowError> rowErrors) throws KuduException {
- final List<OperationResponse> responses = close ? kuduSession.close()
: kuduSession.flush();
-
- if (kuduSession.getFlushMode() ==
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
-
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
- } else {
- responses.stream()
- .filter(OperationResponse::hasRowError)
- .map(OperationResponse::getRowError)
- .forEach(rowErrors::add);
- }
- }
-
-
-
- protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) {
- Upsert upsert = kuduTable.newUpsert();
- this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record,
fieldNames);
- return upsert;
- }
-
- protected Insert insertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) {
- Insert insert = kuduTable.newInsert();
- this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record,
fieldNames);
- return insert;
- }
-
- void buildPartialRow(Schema schema, PartialRow row, Record record,
List<String> fieldNames) {
- for (String colName : fieldNames) {
- int colIdx = this.getColumnIndex(schema, colName);
- if (colIdx != -1) {
- ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
- Type colType = colSchema.getType();
-
- if (record.getValue(colName) == null) {
- row.setNull(colName);
- continue;
- }
-
- switch (colType.getDataType(colSchema.getTypeAttributes())) {
- case BOOL:
- row.addBoolean(colIdx, record.getAsBoolean(colName));
- break;
- case FLOAT:
- row.addFloat(colIdx, record.getAsFloat(colName));
- break;
- case DOUBLE:
- row.addDouble(colIdx, record.getAsDouble(colName));
- break;
- case BINARY:
- row.addBinary(colIdx,
record.getAsString(colName).getBytes());
- break;
- case INT8:
- row.addByte(colIdx,
record.getAsInt(colName).byteValue());
- break;
- case INT16:
- row.addShort(colIdx,
record.getAsInt(colName).shortValue());
- break;
- case INT32:
- row.addInt(colIdx, record.getAsInt(colName));
- break;
- case INT64:
- case UNIXTIME_MICROS:
- row.addLong(colIdx, record.getAsLong(colName));
- break;
- case STRING:
- row.addString(colIdx, record.getAsString(colName));
- break;
- case DECIMAL32:
- case DECIMAL64:
- case DECIMAL128:
- row.addDecimal(colIdx, new
BigDecimal(record.getAsString(colName)));
- break;
- default:
- throw new IllegalStateException(String.format("unknown
column type %s", colType));
- }
- }
- }
- }
-
- private int getColumnIndex(Schema columns, String colName) {
- try {
- return columns.getColumnIndex(colName);
- } catch (Exception ex) {
- return -1;
+ private Operation getKuduOperationType(OperationType operationType, Record
record, List<String> fieldNames, Boolean ignoreNull, KuduTable kuduTable) {
+ switch (operationType) {
+ case DELETE:
+ return deleteRecordFromKudu(kuduTable, record, fieldNames,
ignoreNull);
+ case INSERT:
+ return insertRecordToKudu(kuduTable, record, fieldNames,
ignoreNull);
+ case INSERT_IGNORE:
+ return insertRecordToKudu(kuduTable, record, fieldNames,
ignoreNull);
+ case UPSERT:
+ return upsertRecordToKudu(kuduTable, record, fieldNames,
ignoreNull);
+ case UPDATE:
+ return updateRecordToKudu(kuduTable, record, fieldNames,
ignoreNull);
+ default:
+ throw new
IllegalArgumentException(String.format("OperationType: %s not supported by
Kudu", operationType));
}
}
}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 32a943a..b5ce71f 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -17,11 +17,14 @@
package org.apache.nifi.processors.kudu;
-import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Delete;
+import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.Update;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record;
@@ -30,14 +33,15 @@ import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
+import java.util.LinkedList;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MockPutKudu extends PutKudu {
+
private KuduSession session;
private LinkedList<Insert> insertQueue;
@@ -58,18 +62,41 @@ public class MockPutKudu extends PutKudu {
}
@Override
- protected Insert insertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) {
+ protected Insert insertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames, Boolean ignoreNull) {
Insert insert = insertQueue.poll();
return insert != null ? insert : mock(Insert.class);
}
@Override
- protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) {
+ protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames, Boolean ignoreNull) {
return mock(Upsert.class);
}
@Override
- protected KuduClient buildClient(final String masters) {
+ protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record,
List<String> fieldNames, Boolean ignoreNull) {
+ return mock(Delete.class);
+ }
+
+ @Override
+ protected Update updateRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames, Boolean ignoreNull) {
+ return mock(Update.class);
+ }
+
+ @Override
+ public KuduClient buildClient(final String masters, ProcessContext
context) {
+ final KuduClient client = mock(KuduClient.class);
+
+ try {
+
when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
+ } catch (final Exception e) {
+ throw new AssertionError(e);
+ }
+
+ return client;
+ }
+
+ @Override
+ public KuduClient getKuduClient() {
final KuduClient client = mock(KuduClient.class);
try {
@@ -138,4 +165,4 @@ public class MockPutKudu extends PutKudu {
protected KuduSession getKuduSession(KuduClient client) {
return session;
}
-}
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 322f040..ac7c0cd 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -17,7 +17,7 @@
package org.apache.nifi.processors.kudu;
-import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
+import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
@@ -74,7 +74,7 @@ import static
org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -86,20 +86,23 @@ public class TestPutKudu {
public static final String TABLE_SCHEMA =
"id,stringVal,num32Val,doubleVal";
private TestRunner testRunner;
+
private MockPutKudu processor;
+
private MockRecordParser readerFactory;
@Before
- public void setUp() {
+ public void setUp() throws InitializationException {
processor = new MockPutKudu();
testRunner = TestRunners.newTestRunner(processor);
setUpTestRunner(testRunner);
}
- private void setUpTestRunner(TestRunner testRunner) {
+ private void setUpTestRunner(TestRunner testRunner) throws
InitializationException {
testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS);
testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE);
+ testRunner.setProperty(PutKudu.IGNORE_NULL, "true");
testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
testRunner.setProperty(PutKudu.INSERT_OPERATION,
OperationType.INSERT.toString());
}
@@ -173,7 +176,6 @@ public class TestPutKudu {
assertTrue(proc.loggedOut());
}
-
@Test
public void testInsecureClient() throws InitializationException {
createRecordReader(1);
@@ -327,6 +329,42 @@ public class TestPutKudu {
}
@Test
+ public void testDeleteFlowFiles() throws Exception {
+ createRecordReader(50);
+ testRunner.setProperty(PutKudu.INSERT_OPERATION,
"${kudu.record.delete}");
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put("kudu.record.delete", "DELETE");
+
+ testRunner.enqueue("string".getBytes(), attributes);
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
+
+ flowFile.assertContentEquals("string".getBytes());
+ flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
+ }
+
+ @Test
+ public void testUpdateFlowFiles() throws Exception {
+ createRecordReader(50);
+ testRunner.setProperty(PutKudu.INSERT_OPERATION,
"${kudu.record.update}");
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put("kudu.record.update", "UPDATE");
+
+ testRunner.enqueue("string".getBytes(), attributes);
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
+
+ flowFile.assertContentEquals("string".getBytes());
+ flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50");
+ }
+
+ @Test
public void testBuildRow() {
buildPartialRow((long) 1, "foo", (short) 10);
}
@@ -348,11 +386,11 @@ public class TestPutKudu {
private void buildPartialRow(Long id, String name, Short age) {
final Schema kuduSchema = new Schema(Arrays.asList(
- new ColumnSchemaBuilder("id", Type.INT64).key(true).build(),
- new ColumnSchemaBuilder("name",
Type.STRING).nullable(true).build(),
- new ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
- new ColumnSchemaBuilder("updated_at",
Type.UNIXTIME_MICROS).nullable(false).build(),
- new ColumnSchemaBuilder("score",
Type.DECIMAL).nullable(true).typeAttributes(
+ new ColumnSchema.ColumnSchemaBuilder("id",
Type.INT64).key(true).build(),
+ new ColumnSchema.ColumnSchemaBuilder("name",
Type.STRING).nullable(true).build(),
+ new ColumnSchema.ColumnSchemaBuilder("age",
Type.INT16).nullable(false).build(),
+ new ColumnSchema.ColumnSchemaBuilder("updated_at",
Type.UNIXTIME_MICROS).nullable(false).build(),
+ new ColumnSchema.ColumnSchemaBuilder("score",
Type.DECIMAL).nullable(true).typeAttributes(
new
ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build()
).build()));
@@ -369,11 +407,12 @@ public class TestPutKudu {
values.put("age", age);
values.put("updated_at", System.currentTimeMillis() * 1000);
values.put("score", 10000L);
- new PutKudu().buildPartialRow(
+ processor.buildPartialRow(
kuduSchema,
kuduSchema.newPartialRow(),
new MapRecord(schema, values),
- schema.getFieldNames()
+ schema.getFieldNames(),
+ true
);
}
@@ -393,12 +432,12 @@ public class TestPutKudu {
EXCEPTION
}
- private LinkedList<OperationResponse> queueInsert(MockPutKudu kudu,
KuduSession session, boolean sync, ResultCode... results) throws Exception {
+ private LinkedList<OperationResponse> queueInsert(MockPutKudu putKudu,
KuduSession session, boolean sync, ResultCode... results) throws Exception {
LinkedList<OperationResponse> responses = new LinkedList<>();
for (ResultCode result : results) {
boolean ok = result == OK;
Tuple<Insert, OperationResponse> tuple = insert(ok);
- kudu.queue(tuple.getKey());
+ putKudu.queue(tuple.getKey());
if (result == EXCEPTION) {
when(session.apply(tuple.getKey())).thenThrow(mock(KuduException.class));
@@ -526,6 +565,7 @@ public class TestPutKudu {
testRunner.run(numFlowFiles);
testRunner.assertTransferCount(PutKudu.REL_FAILURE, 3);
+
List<MockFlowFile> failedFlowFiles =
testRunner.getFlowFilesForRelationship(PutKudu.REL_FAILURE);
failedFlowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "2");
failedFlowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, sync ?
"1" : "2");
@@ -557,7 +597,6 @@ public class TestPutKudu {
testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
}
-
public static class MockKerberosCredentialsService extends
AbstractControllerService implements KerberosCredentialsService {
private final String keytab;
private final String principal;