This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1898ad4  NIFI-6895: Fix PutKudu processor concurrency issues
1898ad4 is described below

commit 1898ad44be3186410e01baef60325a6fa5e81187
Author: Grant Henke <[email protected]>
AuthorDate: Mon Dec 2 11:41:18 2019 -0600

    NIFI-6895: Fix PutKudu processor concurrency issues
    
    Calls to `trigger()` may be called concurrently from different threads,
    however the PutKudu processor is storing the `kuduSession`
    in a class level field. This can result in the logging issue reported in
    NIFI-6895 and likely other unusual anomolies including performace
    issues depending on the processor configuration.
    
    Additionally the `operationType` was also stored in a class level field
    and could be set concurrently resulting in the incorrect operation type
    used.
    
    This patch fixes the issue by moving both kuduSession and operationType
    to be local. Additionaly some minor code cleanup was included.
    
    An integration test, ITPutKudu, was added and used to manual verify the
    logging issue existed and is fixed by this patch. I ran the test using
    `mvn -Pintegration-tests verify -Dtest=ITPutKudu`
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3910.
---
 .../nifi-kudu-bundle/nifi-kudu-processors/pom.xml  |  83 +++++++++-
 .../processors/kudu/AbstractKuduProcessor.java     |   4 +-
 .../org/apache/nifi/processors/kudu/PutKudu.java   |  48 +++---
 .../org/apache/nifi/processors/kudu/ITPutKudu.java | 176 +++++++++++++++++++++
 .../apache/nifi/processors/kudu/MockPutKudu.java   |   2 +-
 5 files changed, 286 insertions(+), 27 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 62c5351..63f2150 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -24,6 +24,32 @@
     <artifactId>nifi-kudu-processors</artifactId>
     <packaging>jar</packaging>
 
+    <properties>
+        <exclude.tests>None</exclude.tests>
+        <kudu.version>1.10.0</kudu.version>
+    </properties>
+    <build>
+        <extensions>
+            <!-- Used to find the right kudu-binary artifact with the Maven
+                 property ${os.detected.classifier} -->
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.6.2</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>${exclude.tests}</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -46,7 +72,7 @@
         <dependency>
             <groupId>org.apache.kudu</groupId>
             <artifactId>kudu-client</artifactId>
-            <version>1.10.0</version>
+            <version>${kudu.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -64,6 +90,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-test-utils</artifactId>
+            <version>${kudu.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock-record-utils</artifactId>
             <version>1.11.0-SNAPSHOT</version>
@@ -81,5 +113,54 @@
             <version>${jackson.version}</version>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
+    <profiles>
+        <profile>
+            <id>kudu-windows</id>
+            <activation>
+                <os>
+                    <family>Windows</family>
+                </os>
+            </activation>
+            <properties>
+                <!-- Kudu tests do not support Windows. -->
+                <exclude.tests>**/*.java</exclude.tests>
+            </properties>
+        </profile>
+        <profile>
+            <id>kudu-linux</id>
+            <activation>
+                <os>
+                    <family>Unix</family>
+                </os>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.kudu</groupId>
+                    <artifactId>kudu-binary</artifactId>
+                    <version>${kudu.version}</version>
+                    <classifier>${os.detected.classifier}</classifier>
+                    <scope>test</scope>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>kudu-mac</id>
+            <activation>
+                <os>
+                    <family>mac</family>
+                </os>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.kudu</groupId>
+                    <artifactId>kudu-binary</artifactId>
+                    <version>${kudu.version}</version>
+                    <classifier>${os.detected.classifier}</classifier>
+                    <scope>test</scope>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index fbf931b..166d233 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -121,8 +121,8 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
 
 
     protected KuduClient buildClient(final String masters, final 
ProcessContext context) {
-        final Integer operationTimeout = 
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final Integer adminOperationTimeout = 
context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final Integer operationTimeout = 
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final Integer adminOperationTimeout = 
context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
 
         return new KuduClient.KuduClientBuilder(masters)
                 .defaultOperationTimeoutMs(operationTimeout)
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 6476886..0bb5882 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -175,11 +175,10 @@ public class PutKudu extends AbstractKuduProcessor {
 
     public static final String RECORD_COUNT_ATTR = "record.count";
 
-    protected OperationType operationType;
-    protected SessionConfiguration.FlushMode flushMode;
-
+    // Properties set in onScheduled.
     protected int batchSize = 100;
     protected int ffbatch   = 1;
+    protected SessionConfiguration.FlushMode flushMode;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -208,9 +207,6 @@ public class PutKudu extends AbstractKuduProcessor {
         return rels;
     }
 
-    protected KerberosUser kerberosUser;
-    protected KuduSession kuduSession;
-
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException, 
LoginException {
         batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
@@ -225,9 +221,8 @@ public class PutKudu extends AbstractKuduProcessor {
         if (flowFiles.isEmpty()) {
             return;
         }
-        kerberosUser = getKerberosUser();
 
-        final KerberosUser user = kerberosUser;
+        final KerberosUser user = getKerberosUser();
         if (user == null) {
             trigger(context, session, flowFiles);
             return;
@@ -246,28 +241,40 @@ public class PutKudu extends AbstractKuduProcessor {
         final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
 
         final KuduClient kuduClient = getKuduClient();
-        kuduSession = getKuduSession(kuduClient);
+        final KuduSession kuduSession = createKuduSession(kuduClient);
 
         final Map<FlowFile, Integer> numRecords = new HashMap<>();
         final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
         final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
 
         int numBuffered = 0;
+        OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            Boolean ignoreNull = 
Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            Boolean lowercaseFields = 
Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
+            final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            final OperationType operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
+            final Boolean ignoreNull = 
Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
+            final Boolean lowercaseFields = 
Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
+
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = 
recordReader.getSchema().getFieldNames();
-                final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
                 final KuduTable kuduTable = kuduClient.openTable(tableName);
 
+                // In the case of INSERT_IGNORE the Kudu session is modified 
to ignore row errors.
+                // Because the session is shared across flow files, for 
batching efficiency, we
+                // need to flush when changing to and from INSERT_IGNORE 
operation types.
+                // This should be updated and simplified when KUDU-1563 is 
completed.
+                if (prevOperationType != operationType && (prevOperationType 
== OperationType.INSERT_IGNORE || operationType == 
OperationType.INSERT_IGNORE)) {
+                    flushKuduSession(kuduSession, false, pendingRowErrors);
+                    kuduSession.setIgnoreAllDuplicateRows(operationType == 
OperationType.INSERT_IGNORE);
+                }
+                prevOperationType = operationType;
+
                 Record record = recordSet.next();
                 while (record != null) {
-                    Operation operation = getKuduOperationType(operationType, 
record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
+                    Operation operation = createKuduOperation(operationType, 
record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
                     // We keep track of mappings between Operations and their 
origins,
                     // so that we know which FlowFiles should be marked 
failure after buffered flush.
                     operationFlowFileMap.put(operation, flowFile);
@@ -341,25 +348,20 @@ public class PutKudu extends AbstractKuduProcessor {
         session.adjustCounter("Records Inserted", totalCount, false);
     }
 
-
-    protected KuduSession getKuduSession(final KuduClient client) {
+    protected KuduSession createKuduSession(final KuduClient client) {
         final KuduSession kuduSession = client.newSession();
         kuduSession.setMutationBufferSpace(batchSize);
         kuduSession.setFlushMode(flushMode);
-
-        if (operationType == OperationType.INSERT_IGNORE) {
-            kuduSession.setIgnoreAllDuplicateRows(true);
-        }
-
         return kuduSession;
     }
 
-    private Operation getKuduOperationType(OperationType operationType, Record 
record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields, 
KuduTable kuduTable) {
+    private Operation createKuduOperation(OperationType operationType, Record 
record,
+                                          List<String> fieldNames, Boolean 
ignoreNull,
+                                          Boolean lowercaseFields, KuduTable 
kuduTable) {
         switch (operationType) {
             case DELETE:
                 return deleteRecordFromKudu(kuduTable, record, fieldNames, 
ignoreNull, lowercaseFields);
             case INSERT:
-                return insertRecordToKudu(kuduTable, record, fieldNames, 
ignoreNull, lowercaseFields);
             case INSERT_IGNORE:
                 return insertRecordToKudu(kuduTable, record, fieldNames, 
ignoreNull, lowercaseFields);
             case UPSERT:
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
new file mode 100644
index 0000000..1de5c23
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+public class ITPutKudu {
+
+    public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
+
+    // The KuduTestHarness automatically starts and stops a real Kudu cluster
+    // when each test is run. Kudu persists its on-disk state in a temporary
+    // directory under a location defined by the environment variable 
TEST_TMPDIR
+    // if set, or under /tmp otherwise. That cluster data is deleted on
+    // successful exit of the test. The cluster output is logged through slf4j.
+    @Rule
+    public KuduTestHarness harness = new KuduTestHarness(
+            new MiniKuduCluster.MiniKuduClusterBuilder()
+                .addMasterServerFlag("--use_hybrid_clock=false")
+                .addTabletServerFlag("--use_hybrid_clock=false")
+    );
+
+    private TestRunner testRunner;
+
+    private PutKudu processor;
+
+    private MockRecordParser readerFactory;
+
+    @Before
+    public void setUp() throws Exception {
+        processor = new PutKudu();
+        testRunner = TestRunners.newTestRunner(processor);
+        createKuduTable();
+        setUpTestRunner(testRunner);
+    }
+
+    @After
+    public void tearDown() {
+        testRunner = null;
+    }
+
+    private void setUpTestRunner(TestRunner testRunner) {
+        testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
+        testRunner.setProperty(PutKudu.KUDU_MASTERS, 
harness.getMasterAddressesAsString());
+        testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, "false");
+        testRunner.setProperty(PutKudu.IGNORE_NULL, "true");
+        testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "false");
+        testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
+        testRunner.setProperty(PutKudu.INSERT_OPERATION, 
OperationType.INSERT.toString());
+    }
+
+    private void createKuduTable() throws KuduException {
+        KuduClient client =  harness.getClient();
+        List<ColumnSchema> columns = new ArrayList<>();
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT32).key(true).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("stringVal", 
Type.STRING).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("num32Val", 
Type.INT32).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleVal", 
Type.DOUBLE).build());
+        Schema schema = new Schema(columns);
+        CreateTableOptions opts = new CreateTableOptions()
+            .addHashPartitions(Collections.singletonList("id"), 4);
+        client.createTable(DEFAULT_TABLE_NAME, schema, opts);
+    }
+
+    private void createRecordReader(int numOfRecord) throws 
InitializationException {
+        readerFactory = new MockRecordParser();
+        readerFactory.addSchemaField("id", RecordFieldType.INT);
+        readerFactory.addSchemaField("stringVal", RecordFieldType.STRING);
+        readerFactory.addSchemaField("num32Val", RecordFieldType.INT);
+        readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE);
+
+        for (int i = 0; i < numOfRecord; i++) {
+            readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i);
+        }
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+    }
+
+    @Test
+    public void testWriteKudu() throws IOException, InitializationException {
+        final int recordCount = 100;
+        final int numFlowFiles = 5;
+        createRecordReader(recordCount);
+
+        final String filename = "testWriteKudu-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        // Use values to ensure multiple batches and multiple flow files 
per-trigger
+        testRunner.setProperty(PutKudu.INSERT_OPERATION, 
OperationType.UPSERT.toString());
+        testRunner.setProperty(PutKudu.BATCH_SIZE, "10");
+        testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2");
+
+        // Increase the thread count to better simulate a production 
environment
+        testRunner.setThreadCount(4);
+
+        // Trigger the flow
+        IntStream.range(0, numFlowFiles).forEach(i ->
+            testRunner.enqueue("trigger", flowFileAttributes));
+        testRunner.run(numFlowFiles);
+        testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 
numFlowFiles);
+
+        // verify the successful flow file has the expected content & 
attributes
+        final MockFlowFile mockFlowFile =
+            testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
+        mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), 
filename);
+        mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
+        mockFlowFile.assertContentEquals("trigger");
+
+        // verify we generated provenance events
+        final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();
+        Assert.assertEquals(numFlowFiles, provEvents.size());
+
+        // verify it was a SEND event with the correct URI
+        final ProvenanceEventRecord provEvent = provEvents.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, 
provEvent.getEventType());
+
+        // Verify Kudu record count.
+        KuduClient client = harness.getClient();
+        KuduTable kuduTable = client.openTable(DEFAULT_TABLE_NAME);
+        KuduScanner scanner = client.newScannerBuilder(kuduTable).build();
+        int count = 0;
+        for (RowResult unused : scanner) {
+            count++;
+        }
+        Assert.assertEquals(recordCount, count);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index f416e96..ad10715 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -162,7 +162,7 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    protected KuduSession getKuduSession(KuduClient client) {
+    protected KuduSession createKuduSession(KuduClient client) {
         return session;
     }
 }
\ No newline at end of file

Reply via email to