This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b5e32a09 SAMZA-2741: [Pipeline Drain] Simplifying DrainUtils' 
writeDrainNotification method and change tests (#1629)
0b5e32a09 is described below

commit 0b5e32a09feb58d789706052372db143089a27ff
Author: ajo thomas <[email protected]>
AuthorDate: Mon Aug 22 10:53:17 2022 -0700

    SAMZA-2741: [Pipeline Drain] Simplifying DrainUtils' writeDrainNotification 
method and change tests (#1629)
---
 .../java/org/apache/samza/drain/DrainUtils.java    | 27 ++++++++++++++--
 .../samza/runtime/LocalApplicationRunner.java      |  1 -
 .../apache/samza/util/CoordinatorStreamUtil.scala  | 26 ++++++++++++++-
 .../org/apache/samza/drain/DrainMonitorTests.java  | 37 ++--------------------
 .../drain/DrainHighLevelApiIntegrationTest.java    |  9 +++++-
 .../drain/DrainLowLevelApiIntegrationTest.java     | 11 +++++--
 6 files changed, 70 insertions(+), 41 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java 
b/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
index f7123817d..66449327e 100644
--- a/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
@@ -19,6 +19,7 @@
 package org.apache.samza.drain;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.UUID;
@@ -28,6 +29,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,13 +46,14 @@ public class DrainUtils {
   }
 
   /**
-   * Writes a {@link DrainNotification} to the underlying metastore. This 
method should be used by external controllers
-   * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+   * Writes a {@link DrainNotification} for a given runId to the underlying 
metastore. This method should be used by
+   * external controllers to issue a DrainNotification to the JobCoordinator 
and Samza Containers.
    * @param metadataStore Metadata store to write drain notification to.
    * @param runId runId for the DrainNotification
    *
    * @return generated uuid for the DrainNotification
    */
+  @VisibleForTesting
   public static UUID writeDrainNotification(MetadataStore metadataStore, 
String runId) {
     Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
     Preconditions.checkArgument(!Strings.isNullOrEmpty(runId), "runId should 
be non-null.");
@@ -71,6 +74,26 @@ public class DrainUtils {
     return uuid;
   }
 
+  /**
+   * Writes a {@link DrainNotification} to the underlying metastore. This 
method should be used by external controllers
+   * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+   * @param metadataStore Metadata store to write drain notification to.
+   *
+   * @return generated uuid for the DrainNotification
+   */
+  public static UUID writeDrainNotification(MetadataStore metadataStore) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot 
be null.");
+    final Config config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(metadataStore);
+    final ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    final String runId = applicationConfig.getRunId();
+    if (Strings.isNullOrEmpty(runId)) {
+      throw new SamzaException("Unable to retrieve runId from metadata store. 
DrainNotification will not be "
+          + "written to the metadata store.");
+    }
+    LOG.info("Received runId {}", runId);
+    return writeDrainNotification(metadataStore, runId);
+  }
+
   /**
    * Cleans up DrainNotifications for the current deployment from the 
underlying metadata store.
    * The current runId is extracted from the config.
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7b0a08ffe..0fbc168ea 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -220,7 +220,6 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
 
     try {
       List<JobConfig> jobConfigs = planner.prepareJobs();
-
       // create the StreamProcessors
       if (jobConfigs.isEmpty()) {
         throw new SamzaException("No jobs to run.");
diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 04626632c..0de25d267 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -21,7 +21,6 @@
 package org.apache.samza.util
 
 import java.util
-
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
 import org.apache.samza.coordinator.CoordinationConstants
@@ -171,6 +170,31 @@ object CoordinatorStreamUtil extends Logging {
     new MapConfig(configMap)
   }
 
+  /**
+   * Writes config to the metadata store.
+   * @param metadataStore an instance of the instantiated {@link 
CoordinatorStreamStore}.
+   * @return the configuration read from the coordinator stream.
+   */
+  def writeConfigToCoordinatorStream(metadataStore: MetadataStore, config: 
Config): Unit = {
+    val coordinatorStream: NamespaceAwareCoordinatorStreamStore =
+      new NamespaceAwareCoordinatorStreamStore(metadataStore, SetConfig.TYPE)
+    val valueSerde: CoordinatorStreamValueSerde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE)
+    config.entrySet().forEach((entry) => {
+      val key = entry.getKey
+      val value = entry.getValue
+      if (value == null) {
+        warn("Value for key: %s in config is null. Ignoring it." format key)
+      } else {
+        val valueBytes = valueSerde.toBytes(value)
+        if (valueBytes == null) {
+          warn("Deserialized value for key: %s in config is null. Ignoring 
it." format key)
+        } else {
+          coordinatorStream.put(key, valueBytes);
+        }
+      }
+    })
+  }
+
   def writeConfigToCoordinatorStream(config: Config, resetJobConfig: Boolean = 
true) {
     debug("config: %s" format config)
     val coordinatorSystemConsumer = new 
CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
diff --git 
a/samza-core/src/test/java/org/apache/samza/drain/DrainMonitorTests.java 
b/samza-core/src/test/java/org/apache/samza/drain/DrainMonitorTests.java
index 80061b330..6f845eaba 100644
--- a/samza-core/src/test/java/org/apache/samza/drain/DrainMonitorTests.java
+++ b/samza-core/src/test/java/org/apache/samza/drain/DrainMonitorTests.java
@@ -114,7 +114,7 @@ public class DrainMonitorTests {
     final AtomicInteger numCallbacks = new AtomicInteger(0);
     final CountDownLatch latch = new CountDownLatch(1);
     // write drain before monitor start
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
+    DrainUtils.writeDrainNotification(coordinatorStreamStore);
     DrainMonitor drainMonitor = new DrainMonitor(coordinatorStreamStore, 
CONFIG);
     drainMonitor.registerDrainCallback(() -> {
       numCallbacks.incrementAndGet();
@@ -141,7 +141,7 @@ public class DrainMonitorTests {
       latch.countDown();
     });
     drainMonitor.start();
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
+    DrainUtils.writeDrainNotification(coordinatorStreamStore);
     if (!latch.await(2, TimeUnit.SECONDS)) {
       Assert.fail("Timed out waiting for drain callback to complete");
     }
@@ -149,30 +149,6 @@ public class DrainMonitorTests {
     Assert.assertEquals(1, numCallbacks.get());
   }
 
-  @Test
-  public void testCallbackNotCalledDueToMismatchedRunId() throws 
InterruptedException {
-    // The test fails due to timeout as the published DrainNotification's 
runId doesn't match runId
-    // in the config
-    exceptionRule.expect(AssertionError.class);
-    exceptionRule.expectMessage("Timed out waiting for drain callback to 
complete.");
-    final AtomicInteger numCallbacks = new AtomicInteger(0);
-    final CountDownLatch latch = new CountDownLatch(1);
-
-    DrainMonitor drainMonitor = new DrainMonitor(coordinatorStreamStore, 
CONFIG, 100L);
-
-    drainMonitor.registerDrainCallback(() -> {
-      numCallbacks.incrementAndGet();
-      latch.countDown();
-    });
-
-    drainMonitor.start();
-    final String mismatchedRunId = "bar";
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, mismatchedRunId);
-    if (!latch.await(2, TimeUnit.SECONDS)) {
-      Assert.fail("Timed out waiting for drain callback to complete.");
-    }
-  }
-
   @Test
   public void testDrainMonitorStop() {
     DrainMonitor drainMonitor = new DrainMonitor(coordinatorStreamStore, 
CONFIG, 100L);
@@ -184,16 +160,9 @@ public class DrainMonitorTests {
 
   @Test
   public void testShouldDrain() {
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
+    DrainUtils.writeDrainNotification(coordinatorStreamStore);
     NamespaceAwareCoordinatorStreamStore drainStore =
         new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
DrainUtils.DRAIN_METADATA_STORE_NAMESPACE);
     Assert.assertTrue(DrainMonitor.shouldDrain(drainStore, TEST_RUN_ID));
-
-    // Cleanup old drain message
-    DrainUtils.cleanup(coordinatorStreamStore, CONFIG);
-
-    final String mismatchedRunId = "bar";
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, mismatchedRunId);
-    Assert.assertFalse(DrainMonitor.shouldDrain(drainStore, TEST_RUN_ID));
   }
 }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java
index c1a6f914f..c8cd5340f 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java
@@ -49,6 +49,7 @@ import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescripto
 import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.table.TestTableData;
 import org.apache.samza.test.table.TestTableData.PageView;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -110,12 +111,18 @@ public class DrainHighLevelApiIntegrationTest {
     Config configFromRunner = testRunner.getConfig();
     MetadataStore metadataStore = 
metadataStoreFactory.getMetadataStore("NoOp", configFromRunner, new 
MetricsRegistryMap());
 
+    // Write configs to the coordinator stream here as neither the passthrough 
JC nor the StreamProcessor is writing
+    // configs to coordinator stream. RemoteApplicationRunner typically write 
the configs to the metadata store
+    // before starting the JC.
+    // We are doing this so that DrainUtils.writeDrainNotification can read 
app.run.id from the config
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(metadataStore, 
configFromRunner);
+
     // write drain message after a delay
     ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
     executorService.schedule(new Callable<String>() {
       @Override
       public String call() throws Exception {
-        UUID uuid = DrainUtils.writeDrainNotification(metadataStore, runId);
+        UUID uuid = DrainUtils.writeDrainNotification(metadataStore);
         return uuid.toString();
       }
     }, 2000L, TimeUnit.MILLISECONDS);
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/drain/DrainLowLevelApiIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/drain/DrainLowLevelApiIntegrationTest.java
index 7579c6eda..ed385f035 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/drain/DrainLowLevelApiIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/drain/DrainLowLevelApiIntegrationTest.java
@@ -54,6 +54,7 @@ import org.apache.samza.test.framework.TestRunner;
 import 
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
 import 
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.table.TestTableData;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -110,7 +111,7 @@ public class DrainLowLevelApiIntegrationTest {
 
   // The test can be occasionally flaky, so we set Ignore annotation
   // Remove ignore annotation and run the test as follows:
-  // ./gradlew :samza-test:test --tests 
org.apache.samza.test.drain.DrainHighLevelApiIntegrationTest -PscalaSuffix=2.12
+  // ./gradlew :samza-test:test --tests 
org.apache.samza.test.drain.DrainLowLevelApiIntegrationTest -PscalaSuffix=2.12
   @Ignore
   @Test
   public void testPipeline() {
@@ -139,12 +140,18 @@ public class DrainLowLevelApiIntegrationTest {
     MetadataStore
         metadataStore = metadataStoreFactory.getMetadataStore("NoOp", 
configFromRunner, new MetricsRegistryMap());
 
+    // Write configs to the coordinator stream here as neither the passthrough 
JC nor the StreamProcessor is writing
+    // configs to coordinator stream. RemoteApplicationRunner typically write 
the configs to the metadata store
+    // before starting the JC.
+    // We are doing this so that DrainUtils.writeDrainNotification can read 
app.run.id from the config
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(metadataStore, 
configFromRunner);
+
     // write drain message after a delay
     ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
     executorService.schedule(new Callable<String>() {
       @Override
       public String call() throws Exception {
-        UUID uuid = DrainUtils.writeDrainNotification(metadataStore, runId);
+        UUID uuid = DrainUtils.writeDrainNotification(metadataStore);
         return uuid.toString();
       }
     }, 2000L, TimeUnit.MILLISECONDS);

Reply via email to