turcsanyip commented on code in PR #7076:
URL: https://github.com/apache/nifi/pull/7076#discussion_r1206003566


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java:
##########
@@ -403,36 +420,76 @@ public void stopClient() {
                 getLogger().warn("Event Processor Client stop failed", e);
             }
             eventProcessorClient = null;
-            processSessionFactory = null;
-            readerFactory = null;
-            writerFactory = null;
         }
     }
 
-    protected EventProcessorClient createClient(final ProcessContext context) {
-        namespaceName = 
context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+    protected EventProcessorClient createClient(final ProcessContext context, 
final ProcessSessionFactory processSessionFactory) {
+        final String namespaceName = 
context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
         final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         final String consumerGroup = 
context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
+        final String identifier = UUID.randomUUID().toString();
 
-        final String containerName = 
defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(),
 eventHubName);
-        final String storageConnectionString = 
createStorageConnectionString(context);
-        final BlobContainerAsyncClient blobContainerAsyncClient = new 
BlobContainerClientBuilder()
-                .connectionString(storageConnectionString)
-                .containerName(containerName)
-                .buildAsyncClient();
-        final BlobCheckpointStore checkpointStore = new 
BlobCheckpointStore(blobContainerAsyncClient);
+        CheckpointStore checkpointStore;
+        final Map<String, EventPosition> legacyPartitionEventPosition;
+
+        final CheckpointStrategy checkpointStrategy = 
CheckpointStrategy.valueOf(
+                context.getProperty(CHECKPOINT_STRATEGY).getValue()
+        );
+        if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
+            final String containerName = 
defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(),
 eventHubName);
+            final String storageConnectionString = 
createStorageConnectionString(context);
+            final BlobContainerAsyncClient blobContainerAsyncClient = 
getBlobContainerAsyncClient(containerName, storageConnectionString);
+            checkpointStore = 
getCheckpointStoreFromBlobContainer(blobContainerAsyncClient);
+            legacyPartitionEventPosition = 
getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
+        } else {
+            checkpointStore = new ComponentStateCheckpointStore(
+                    identifier,
+                    new ComponentStateCheckpointStore.State() {
+                        public StateMap getState() throws IOException {
+                            final ProcessSession session = 
processSessionFactory.createSession();
+                            return session.getState(Scope.CLUSTER);
+                        }
+
+                        public boolean replaceState(StateMap oldValue, 
Map<String, String> newValue) throws IOException {
+                            final ProcessSession session = 
processSessionFactory.createSession();
+                            if (!session.replaceState(oldValue, newValue, 
Scope.CLUSTER)) {

Review Comment:
   `ProcessSession.replaceState()` does not provide fully proper optimistic 
locking because the state can be changed by another session between 
`replaceState()` and `commit()`. So even if `replaceState()` returns `true`, 
the state change may be omitted at commit time due to a concurrent update.
   
   I suggest using `ProcessContext.getStateManager().replace()` instead which 
has the right optimistic locking semantics which is required from a checkpoint 
store implementation.
   
   Please note `ProcessContext.getStateManager().replace()` cannot initialize 
the state currently (see also 
[NIFI-11595](https://issues.apache.org/jira/browse/NIFI-11595)) so it needs to 
be created with `setState()` before using `replace()`. E.g. in `@OnScheduled` 
like this:
   ```
       @OnScheduled
       public void onScheduled(ProcessContext context) throws IOException {
           if (getNodeTypeProvider().isPrimary()) {
               final StateManager stateManager = context.getStateManager();
               final StateMap state = stateManager.getState(Scope.CLUSTER);
   
               if (!state.getStateVersion().isPresent()) {
                   stateManager.setState(new HashMap<>(), Scope.CLUSTER);
               }
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to