vjagadish1989 commented on a change in pull request #938: SAMZA-1531: Support 
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r286724031
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
 ##########
 @@ -104,36 +127,89 @@ private void createStreams(String planId, 
List<StreamSpec> intStreams, StreamMan
       LOG.info("Set of intermediate streams is empty. Nothing to create.");
       return;
     }
-    LOG.info("A single processor must create the intermediate streams. 
Processor {} will attempt to acquire the lock.", uid);
+    LOG.info("A single processor must create the intermediate streams. 
Processor {} will attempt to acquire the lock.", processorId);
     // Move the scope of coordination utils within stream creation to address 
long idle connection problem.
     // Refer SAMZA-1385 for more details
-    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig);
-    String coordinationId = new ApplicationConfig(userConfig).getGlobalAppId() 
+ APPLICATION_RUNNER_PATH_SUFFIX;
-    CoordinationUtils coordinationUtils =
-        
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, 
uid, userConfig);
     if (coordinationUtils == null) {
-      LOG.warn("Processor {} failed to create utils. Each processor will 
attempt to create streams.", uid);
+      LOG.warn("Processor {} failed to create utils. Each processor will 
attempt to create streams.", processorId);
       // each application process will try creating the streams, which
       // requires stream creation to be idempotent
       streamManager.createStreams(intStreams);
       return;
     }
 
-    DistributedLockWithState lockWithState = 
coordinationUtils.getLockWithState(planId);
+    // If BATCH, then need to create new intermediate streams every run.
+    // planId does not change every run and hence, need to use runid
+    // as the lockId to create a new lock with state each run
+    // to create new streams each run.
+    // If run.id is null, defaults to old behavior of using planId
+    boolean isAppModeBatch = new ApplicationConfig(userConfig).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+    String lockId = planId;
+    if (isAppModeBatch && runId != null) {
+      lockId = runId;
+    }
     try {
-      // check if the processor needs to go through leader election and stream 
creation
-      if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
-        LOG.info("lock acquired for streams creation by " + uid);
-        streamManager.createStreams(intStreams);
-        lockWithState.unlockAndSet();
-      } else {
-        LOG.info("Processor {} did not obtain the lock for streams creation. 
They must've been created by another processor.", uid);
-      }
-    } catch (TimeoutException e) {
-      String msg = String.format("Processor {} failed to get the lock for 
stream initialization", uid);
-      throw new SamzaException(msg, e);
+      checkAndCreateStreams(lockId, intStreams, streamManager);
+    } catch (TimeoutException te) {
+      throw new SamzaException(String.format("Processor {} failed to get the 
lock for stream initialization within timeout.", processorId), te);
     } finally {
-      coordinationUtils.close();
+      if (!isAppModeBatch && coordinationUtils != null) {
+        coordinationUtils.close();
+      }
+    }
+  }
+
+  private void checkAndCreateStreams(String lockId, List<StreamSpec> 
intStreams, StreamManager streamManager) throws TimeoutException {
+    MetadataStore metadataStore = getMetadataStore();
+    DistributedLock distributedLock = coordinationUtils.getLock(lockId);
+    if (distributedLock == null || metadataStore == null) {
+      LOG.warn("Processor {} failed to create utils. Each processor will 
attempt to create streams.", processorId);
+      // each application process will try creating the streams, which 
requires stream creation to be idempotent
+      streamManager.createStreams(intStreams);
+      return;
+    }
+    //Start timer for timeout
+    long startTime = System.currentTimeMillis();
+    long lockTimeout = 
TimeUnit.MILLISECONDS.convert(CoordinationConstants.LOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+
+    // If "stream created state" exists in store then skip stream creation
+    // Else acquire lock, create streams, set state in store and unlock
+    // Checking for state before acquiring lock to prevent all processors from 
acquiring lock
+    // In a while loop so that if two processors check state simultaneously 
then
+    // to make sure the processor not acquiring the lock
+    // does not die of timeout exception and comes back and checks for state 
and proceeds
+    while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+      if (metadataStore.get(String.format(STREAM_CREATED_STATE_KEY, lockId)) 
!= null) {
+        LOG.info("Processor {} found streams created state data. They must've 
been created by another processor.", processorId);
+        break;
+      }
+      try {
+        if (distributedLock.lock(Duration.ofMillis(10000))) {
+          LOG.info("lock acquired for streams creation by Processor " + 
processorId);
+          streamManager.createStreams(intStreams);
+          String streamCreatedMessage = "Streams created by processor " + 
processorId;
+          metadataStore.put(String.format(STREAM_CREATED_STATE_KEY, lockId), 
streamCreatedMessage.getBytes("UTF-8"));
+          distributedLock.unlock();
+          break;
+        } else {
+          LOG.info("Processor {} failed to get the lock for stream 
initialization. Will try again until time out", processorId);
+        }
+      } catch (UnsupportedEncodingException e) {
+        String msg = String.format("Processor {} failed to encode string for 
stream initialization", processorId);
 
 Review comment:
   inline the msg instead of having a local variable

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to