exceptionfactory commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1203002539


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java:
##########
@@ -117,13 +131,22 @@ public class TestConsumeAzureEventHub {
     @Mock
     RecordReader reader;
 
+    @Mock
+    CheckpointStore checkpointStore;
+
+    @Mock
+    BlobContainerAsyncClient blobContainerAsyncClient;
+
+    @Captor
+    private ArgumentCaptor<Consumer<EventBatchContext>> 
eventBatchProcessorCapture;
+
     private MockConsumeAzureEventHub processor;
 
     private TestRunner testRunner;
 
     @BeforeEach
     public void setupProcessor() {
-        processor = new MockConsumeAzureEventHub();
+        processor = spy(new MockConsumeAzureEventHub());

Review Comment:
   The `MockConsumeAzureEventHub` Processor already isolates certain behavior 
for testing, so introducing a `spy()` creates and additional level of 
indirection. Is there a particular reason for this approach as opposed to other 
alternatives?



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/utils/TestComponentStateCheckpointStore.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import com.azure.messaging.eventhubs.models.Checkpoint;
+import com.azure.messaging.eventhubs.models.PartitionOwnership;
+import org.apache.nifi.components.state.StateMap;
+import 
org.apache.nifi.processors.azure.eventhub.utils.ComponentStateCheckpointStore.State;
+import org.apache.nifi.state.MockStateMap;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertThrows;
+
+class TestComponentStateCheckpointStore {
+    private static final String EVENT_HUB_NAMESPACE = "NAMESPACE";
+    private static final String EVENT_HUB_NAME = "NAME";
+    private static final String CONSUMER_GROUP = "CONSUMER";
+    private static final String PARTITION_ID = "1";
+    private static final long OFFSET = 1234;
+
+    final static String IDENTIFIER = "id";
+    ComponentStateCheckpointStore checkpointStore;
+    Checkpoint checkpoint;
+    PartitionOwnership partitionOwnership;
+
+    boolean failToReplaceState = false;
+    boolean throwIOExceptionOnGetState = false;
+    boolean throwIOExceptionOnReplaceState = false;
+
+    @Test
+    public void testClaimOwnershipSuccess() {
+        var claimed = checkpointStore.claimOwnership(List.of(
+            partitionOwnership
+        )).blockFirst();
+        var listed = checkpointStore.listOwnership(
+                EVENT_HUB_NAMESPACE,
+                EVENT_HUB_NAME,
+                CONSUMER_GROUP
+        ).blockFirst();
+        assert listed.getETag().equals(claimed.getETag());

Review Comment:
   The `assert` keyword is not intended for testing. All instances should be 
replaces with `assertEquals()` or other applicable JUnit 5 assert methods.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -403,36 +420,76 @@ public void stopClient() {
                 getLogger().warn("Event Processor Client stop failed", e);
             }
             eventProcessorClient = null;
-            processSessionFactory = null;
-            readerFactory = null;
-            writerFactory = null;
         }
     }
 
-    protected EventProcessorClient createClient(final ProcessContext context) {
-        namespaceName = 
context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+    protected EventProcessorClient createClient(final ProcessContext context, 
final ProcessSessionFactory processSessionFactory) {
+        final String namespaceName = 
context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
         final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         final String consumerGroup = 
context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
+        final String identifier = UUID.randomUUID().toString();
 
-        final String containerName = 
defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(),
 eventHubName);
-        final String storageConnectionString = 
createStorageConnectionString(context);
-        final BlobContainerAsyncClient blobContainerAsyncClient = new 
BlobContainerClientBuilder()
-                .connectionString(storageConnectionString)
-                .containerName(containerName)
-                .buildAsyncClient();
-        final BlobCheckpointStore checkpointStore = new 
BlobCheckpointStore(blobContainerAsyncClient);
+        CheckpointStore checkpointStore;
+        final Map<String, EventPosition> legacyPartitionEventPosition;
+
+        final CheckpointStrategy checkpointStrategy = 
CheckpointStrategy.valueOf(
+                context.getProperty(CHECKPOINT_STRATEGY).getValue()
+        );
+        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
+            final String containerName = 
defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(),
 eventHubName);
+            final String storageConnectionString = 
createStorageConnectionString(context);
+            final BlobContainerAsyncClient blobContainerAsyncClient = 
getBlobContainerAsyncClient(containerName, storageConnectionString);
+            checkpointStore = 
getCheckpointStoreFromBlobContainer(blobContainerAsyncClient);
+            legacyPartitionEventPosition = 
getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
+        } else {
+            checkpointStore = new ComponentStateCheckpointStore(
+                    identifier,
+                    new ComponentStateCheckpointStore.State() {

Review Comment:
   Having two different anonymous inner class implementations make this hard to 
follow. Recommend breaking this out to a distinct class.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -486,11 +563,36 @@ protected String getTransitUri(final PartitionContext 
partitionContext) {
             if (readerFactory == null || writerFactory == null) {
                 writeFlowFiles(eventBatchContext, session, stopWatch);
             } else {
-                writeRecords(eventBatchContext, session, stopWatch);
+                writeRecords(eventBatchContext, session, readerFactory, 
writerFactory, stopWatch);
             }
 
-            // Commit ProcessSession and then update Azure Event Hubs 
checkpoint status
-            session.commitAsync(eventBatchContext::updateCheckpoint);
+            // As a special case, when the checkpoint strategy is component 
state,
+            // we reuse the current session.
+            if (checkpointStrategy == CheckpointStrategy.COMPONENT_STATE) {
+                eventBatchContext = new EventBatchContext(
+                        eventBatchContext.getPartitionContext(),
+                        eventBatchContext.getEvents(),
+                        new ComponentStateCheckpointStore(
+                                identifier,
+                                new ComponentStateCheckpointStore.State() {
+                                    public StateMap getState() throws 
IOException {
+                                        return session.getState(Scope.CLUSTER);
+                                    }
+
+                                    public boolean replaceState(StateMap 
oldValue, Map<String, String> newValue) {
+                                        return false;
+                                    }
+                                }
+                        ),

Review Comment:
   The inner anonymous class results in multiple levels of nesting that make 
this hard to follow. Recommend breaking it out to an explicit class.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java:
##########
@@ -205,14 +228,13 @@ public void 
testProcessorConfigValidityWithStorageKeySet() {
         testRunner.assertValid();
     }
 
-    @Test
-    public void testReceiveOne() {
-        setProperties();
-        testRunner.run(1, false);
+    @EnumSource(CheckpointStrategy.class)

Review Comment:
   Changing all of these test methods to run for both checkpoint strategy is 
unnecessary and results in unnecessary processing. Recommend creating a couple 
separate methods instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to