This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0b5e32a09 SAMZA-2741: [Pipeline Drain] Simplifying DrainUtils'
writeDrainNotification method and change tests (#1629)
0b5e32a09 is described below
commit 0b5e32a09feb58d789706052372db143089a27ff
Author: ajo thomas <[email protected]>
AuthorDate: Mon Aug 22 10:53:17 2022 -0700
SAMZA-2741: [Pipeline Drain] Simplifying DrainUtils' writeDrainNotification
method and change tests (#1629)
---
.../java/org/apache/samza/drain/DrainUtils.java | 27 ++++++++++++++--
.../samza/runtime/LocalApplicationRunner.java | 1 -
.../apache/samza/util/CoordinatorStreamUtil.scala | 26 ++++++++++++++-
.../org/apache/samza/drain/DrainMonitorTests.java | 37 ++--------------------
.../drain/DrainHighLevelApiIntegrationTest.java | 9 +++++-
.../drain/DrainLowLevelApiIntegrationTest.java | 11 +++++--
6 files changed, 70 insertions(+), 41 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
b/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
index f7123817d..66449327e 100644
--- a/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
@@ -19,6 +19,7 @@
package org.apache.samza.drain;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.UUID;
@@ -28,6 +29,7 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,13 +46,14 @@ public class DrainUtils {
}
/**
- * Writes a {@link DrainNotification} to the underlying metastore. This
method should be used by external controllers
- * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+ * Writes a {@link DrainNotification} for a given runId to the underlying
metastore. This method should be used by
+ * external controllers to issue a DrainNotification to the JobCoordinator
and Samza Containers.
* @param metadataStore Metadata store to write drain notification to.
* @param runId runId for the DrainNotification
*
* @return generated uuid for the DrainNotification
*/
+ @VisibleForTesting
public static UUID writeDrainNotification(MetadataStore metadataStore,
String runId) {
Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot
be null.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(runId), "runId should
be non-null.");
@@ -71,6 +74,26 @@ public class DrainUtils {
return uuid;
}
+ /**
+ * Writes a {@link DrainNotification} to the underlying metastore. This
method should be used by external controllers
+ * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+ * @param metadataStore Metadata store to write drain notification to.
+ *
+ * @return generated uuid for the DrainNotification
+ */
+ public static UUID writeDrainNotification(MetadataStore metadataStore) {
+ Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot
be null.");
+ final Config config =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(metadataStore);
+ final ApplicationConfig applicationConfig = new ApplicationConfig(config);
+ final String runId = applicationConfig.getRunId();
+ if (Strings.isNullOrEmpty(runId)) {
+ throw new SamzaException("Unable to retrieve runId from metadata store.
DrainNotification will not be "
+ + "written to the metadata store.");
+ }
+ LOG.info("Received runId {}", runId);
+ return writeDrainNotification(metadataStore, runId);
+ }
+
/**
* Cleans up DrainNotifications for the current deployment from the
underlying metadata store.
* The current runId is extracted from the config.
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7b0a08ffe..0fbc168ea 100644
---
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -220,7 +220,6 @@ public class LocalApplicationRunner implements
ApplicationRunner {
try {
List<JobConfig> jobConfigs = planner.prepareJobs();
-
// create the StreamProcessors
if (jobConfigs.isEmpty()) {
throw new SamzaException("No jobs to run.");
diff --git
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 04626632c..0de25d267 100644
---
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -21,7 +21,6 @@
package org.apache.samza.util
import java.util
-
import org.apache.samza.SamzaException
import org.apache.samza.config._
import org.apache.samza.coordinator.CoordinationConstants
@@ -171,6 +170,31 @@ object CoordinatorStreamUtil extends Logging {
new MapConfig(configMap)
}
+ /**
+ * Writes config to the metadata store.
+ * @param metadataStore an instance of the instantiated {@link
CoordinatorStreamStore}.
+ * @return the configuration read from the coordinator stream.
+ */
+ def writeConfigToCoordinatorStream(metadataStore: MetadataStore, config:
Config): Unit = {
+ val coordinatorStream: NamespaceAwareCoordinatorStreamStore =
+ new NamespaceAwareCoordinatorStreamStore(metadataStore, SetConfig.TYPE)
+ val valueSerde: CoordinatorStreamValueSerde = new
CoordinatorStreamValueSerde(SetConfig.TYPE)
+ config.entrySet().forEach((entry) => {
+ val key = entry.getKey
+ val value = entry.getValue
+ if (value == null) {
+ warn("Value for key: %s in config is null. Ignoring it." format key)
+ } else {
+ val valueBytes = valueSerde.toBytes(value)
+ if (valueBytes == null) {
+ warn("Deserialized value for key: %s in config is null. Ignoring
it." format key)
+ } else {
+ coordinatorStream.put(key, valueBytes);
+ }
+ }
+ })
+ }
+
def writeConfigToCoordinatorStream(config: Config, resetJobConfig: Boolean =
true) {
debug("config: %s" format config)
val coordinatorSystemConsumer = new
CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
diff --git
a/samza-core/src/test/java/org/apache/samza/drain/DrainMonitorTests.java
b/samza-core/src/test/java/org/apache/samza/drain/DrainMonitorTests.java
index 80061b330..6f845eaba 100644
--- a/samza-core/src/test/java/org/apache/samza/drain/DrainMonitorTests.java
+++ b/samza-core/src/test/java/org/apache/samza/drain/DrainMonitorTests.java
@@ -114,7 +114,7 @@ public class DrainMonitorTests {
final AtomicInteger numCallbacks = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
// write drain before monitor start
- DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
+ DrainUtils.writeDrainNotification(coordinatorStreamStore);
DrainMonitor drainMonitor = new DrainMonitor(coordinatorStreamStore,
CONFIG);
drainMonitor.registerDrainCallback(() -> {
numCallbacks.incrementAndGet();
@@ -141,7 +141,7 @@ public class DrainMonitorTests {
latch.countDown();
});
drainMonitor.start();
- DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
+ DrainUtils.writeDrainNotification(coordinatorStreamStore);
if (!latch.await(2, TimeUnit.SECONDS)) {
Assert.fail("Timed out waiting for drain callback to complete");
}
@@ -149,30 +149,6 @@ public class DrainMonitorTests {
Assert.assertEquals(1, numCallbacks.get());
}
- @Test
- public void testCallbackNotCalledDueToMismatchedRunId() throws
InterruptedException {
- // The test fails due to timeout as the published DrainNotification's
runId doesn't match runId
- // in the config
- exceptionRule.expect(AssertionError.class);
- exceptionRule.expectMessage("Timed out waiting for drain callback to
complete.");
- final AtomicInteger numCallbacks = new AtomicInteger(0);
- final CountDownLatch latch = new CountDownLatch(1);
-
- DrainMonitor drainMonitor = new DrainMonitor(coordinatorStreamStore,
CONFIG, 100L);
-
- drainMonitor.registerDrainCallback(() -> {
- numCallbacks.incrementAndGet();
- latch.countDown();
- });
-
- drainMonitor.start();
- final String mismatchedRunId = "bar";
- DrainUtils.writeDrainNotification(coordinatorStreamStore, mismatchedRunId);
- if (!latch.await(2, TimeUnit.SECONDS)) {
- Assert.fail("Timed out waiting for drain callback to complete.");
- }
- }
-
@Test
public void testDrainMonitorStop() {
DrainMonitor drainMonitor = new DrainMonitor(coordinatorStreamStore,
CONFIG, 100L);
@@ -184,16 +160,9 @@ public class DrainMonitorTests {
@Test
public void testShouldDrain() {
- DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
+ DrainUtils.writeDrainNotification(coordinatorStreamStore);
NamespaceAwareCoordinatorStreamStore drainStore =
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
DrainUtils.DRAIN_METADATA_STORE_NAMESPACE);
Assert.assertTrue(DrainMonitor.shouldDrain(drainStore, TEST_RUN_ID));
-
- // Cleanup old drain message
- DrainUtils.cleanup(coordinatorStreamStore, CONFIG);
-
- final String mismatchedRunId = "bar";
- DrainUtils.writeDrainNotification(coordinatorStreamStore, mismatchedRunId);
- Assert.assertFalse(DrainMonitor.shouldDrain(drainStore, TEST_RUN_ID));
}
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java
index c1a6f914f..c8cd5340f 100644
---
a/samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/test/drain/DrainHighLevelApiIntegrationTest.java
@@ -49,6 +49,7 @@ import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescripto
import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.table.TestTableData;
import org.apache.samza.test.table.TestTableData.PageView;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.Ignore;
import org.junit.Test;
@@ -110,12 +111,18 @@ public class DrainHighLevelApiIntegrationTest {
Config configFromRunner = testRunner.getConfig();
MetadataStore metadataStore =
metadataStoreFactory.getMetadataStore("NoOp", configFromRunner, new
MetricsRegistryMap());
+ // Write configs to the coordinator stream here as neither the passthrough
JC nor the StreamProcessor is writing
+ // configs to coordinator stream. RemoteApplicationRunner typically write
the configs to the metadata store
+ // before starting the JC.
+ // We are doing this so that DrainUtils.writeDrainNotification can read
app.run.id from the config
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(metadataStore,
configFromRunner);
+
// write drain message after a delay
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
executorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
- UUID uuid = DrainUtils.writeDrainNotification(metadataStore, runId);
+ UUID uuid = DrainUtils.writeDrainNotification(metadataStore);
return uuid.toString();
}
}, 2000L, TimeUnit.MILLISECONDS);
diff --git
a/samza-test/src/test/java/org/apache/samza/test/drain/DrainLowLevelApiIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/test/drain/DrainLowLevelApiIntegrationTest.java
index 7579c6eda..ed385f035 100644
---
a/samza-test/src/test/java/org/apache/samza/test/drain/DrainLowLevelApiIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/test/drain/DrainLowLevelApiIntegrationTest.java
@@ -54,6 +54,7 @@ import org.apache.samza.test.framework.TestRunner;
import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.table.TestTableData;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.Ignore;
import org.junit.Test;
@@ -110,7 +111,7 @@ public class DrainLowLevelApiIntegrationTest {
// The test can be occasionally flaky, so we set Ignore annotation
// Remove ignore annotation and run the test as follows:
- // ./gradlew :samza-test:test --tests
org.apache.samza.test.drain.DrainHighLevelApiIntegrationTest -PscalaSuffix=2.12
+ // ./gradlew :samza-test:test --tests
org.apache.samza.test.drain.DrainLowLevelApiIntegrationTest -PscalaSuffix=2.12
@Ignore
@Test
public void testPipeline() {
@@ -139,12 +140,18 @@ public class DrainLowLevelApiIntegrationTest {
MetadataStore
metadataStore = metadataStoreFactory.getMetadataStore("NoOp",
configFromRunner, new MetricsRegistryMap());
+ // Write configs to the coordinator stream here as neither the passthrough
JC nor the StreamProcessor is writing
+ // configs to coordinator stream. RemoteApplicationRunner typically write
the configs to the metadata store
+ // before starting the JC.
+ // We are doing this so that DrainUtils.writeDrainNotification can read
app.run.id from the config
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(metadataStore,
configFromRunner);
+
// write drain message after a delay
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
executorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
- UUID uuid = DrainUtils.writeDrainNotification(metadataStore, runId);
+ UUID uuid = DrainUtils.writeDrainNotification(metadataStore);
return uuid.toString();
}
}, 2000L, TimeUnit.MILLISECONDS);