This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 4c11067 SAMZA-2303: Exclude side inputs when handling end-of-stream
and watermarks for high-level (#1506)
4c11067 is described below
commit 4c11067b0e2d7c6cb1f9c472c0843eea861a632e
Author: Cameron Lee <[email protected]>
AuthorDate: Thu Jul 1 15:21:43 2021 -0700
SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks
for high-level (#1506)
Symptom: End-of-stream and watermarks are not properly propagated through
Samza when side inputs are used.
This prevents many tests from using the TestRunner framework, since the
TestRunner framework relies on having tasks shut themselves down based on
end-of-stream messages. Being able to use TestRunner is helpful because it
significantly decreases test times.
Cause: OperatorImplGraph builds EndOfStreamStates and WatermarkStates
objects with all of the input SSPs from the job model. That includes side-input
SSPs. However, high-level operator tasks aren't given messages from side-input
SSPs, so high-level operators should not need to include handling for
end-of-stream and watermarks. The result of this issue is that end-of-stream
and watermark handling tries to include side-inputs but never updates those
states, which can result in not exiti [...]
Changes:
1. Pass set of SSPs excluding side-inputs to high-level operators so that
they don't read directly from the task model which does have side-inputs.
High-level operators will then handle end-of-stream and watermark propagation
without considering side-input SSPs.
2. Change InMemoryManager to only use
IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the taskName in the
EndOfStreamMessage is null. This prevents the issue with SAMZA-2300 which
causes end-of-stream messages to not get properly get aggregated and then
broadcast to all partitions (see SAMZA-2300 for more details). Some existing
tests would fail without this change.
3. Add unique app.id in TestRunner for each test. This helps prevents
clashes between different tests. For example, ControlMessageSender has a static
cache keyed by stream id of intermediate streams, and multiple tests could end
up using the same key in that cache. By using a unique app id, the intermediate
streams are unique, so multiple tests won't use the same key in the cache.
API changes (impacts testing framework only):
1. The default app.id used for tests executed by TestRunner is set to the
"in-memory scope", which is a string that is randomly generated for each test.
Before this change, the app.id was not set.
2. InMemoryManager only uses IncomingMessageEnvelope.END_OF_STREAM_OFFSET
when the EndOfStreamMessage has a null taskName. Before this change,
InMemoryManager used IncomingMessageEnvelope.END_OF_STREAM_OFFSET for all
EndOfStreamMessages.
Upgrade/usage instructions:
1. If tests are written using TestRunner, and those tests rely on app.id
being unset, then those will need to be updated to use/read the new app.id. It
isn't expected to be a common use case that tests rely on app.id.
2. If the in-memory system is being used (which includes tests written
using TestRunner), and it is expected that the in-memory system sets
END_OF_STREAM_OFFSET for messages when the taskName is non-null, then that
usage will need to be removed. The taskName is intended for use by intermediate
streams, so it shouldn't be used outside of Samza internals anyways.
---
.../apache/samza/context/InternalTaskContext.java | 25 ++-
.../org/apache/samza/context/TaskContextImpl.java | 27 ++-
.../samza/operators/impl/OperatorImplGraph.java | 14 +-
.../samza/system/inmemory/InMemoryManager.java | 20 +-
.../org/apache/samza/container/TaskInstance.scala | 2 +-
.../apache/samza/context/TestTaskContextImpl.java | 2 +-
.../samza/operators/impl/TestWindowOperator.java | 7 +-
.../samza/system/inmemory/TestInMemorySystem.java | 20 +-
.../apache/samza/test/framework/TestRunner.java | 8 +
.../samza/test/table/TestLocalTableEndToEnd.java | 222 ++++++++++-----------
.../TestLocalTableWithSideInputsEndToEnd.java | 188 +++++++++++------
.../org/apache/samza/test/table/TestTableData.java | 63 +++++-
12 files changed, 393 insertions(+), 205 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
index 3d3a53d..804ef93 100644
--- a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
+++ b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
@@ -21,12 +21,24 @@ package org.apache.samza.context;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
-public class InternalTaskContext {
+/**
+ * This class is used for passing objects around for the implementation of the
high-level API.
+ * 1) Container for objects that need to be passed between different
components in the implementation of the high-level
+ * API.
+ * 2) The implementation of the high-level API is built on top of the
low-level API. The low-level API only exposes
+ * {@link TaskContext}, but the implementation of the high-level API needs
some other internal Samza components (e.g.
+ * {@link StreamMetadataCache}. We internally make these components available
through {@link TaskContextImpl} so that we
+ * can do a cast to access the components. This class hides some of the
messiness of casting. It's still not ideal to
+ * need to do any casting, even in this class.
+ */
+public class InternalTaskContext {
private final Context context;
private final Map<String, Object> objectRegistry = new HashMap<>();
@@ -46,6 +58,10 @@ public class InternalTaskContext {
return context;
}
+ /**
+ * TODO: The public {@link JobContext} exposes {@link JobModel} now, so can
this internal method be replaced by the
+ * public API?
+ */
public JobModel getJobModel() {
return ((TaskContextImpl) this.context.getTaskContext()).getJobModel();
}
@@ -53,4 +69,11 @@ public class InternalTaskContext {
public StreamMetadataCache getStreamMetadataCache() {
return ((TaskContextImpl)
this.context.getTaskContext()).getStreamMetadataCache();
}
+
+ /**
+ * See {@link TaskContextImpl#getSspsExcludingSideInputs()}.
+ */
+ public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
+ return ((TaskContextImpl)
this.context.getTaskContext()).getSspsExcludingSideInputs();
+ }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index edec17d..d87a5bc 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -29,9 +29,15 @@ import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.TableManager;
+import java.util.Set;
import java.util.function.Function;
+/**
+ * This class provides the implementation for the public {@link TaskContext}
interface.
+ * It also allows us to pass certain internal Samza components around so that
the implementation of the high-level API
+ * can use them (see InternalTaskContext for some more details).
+ */
public class TaskContextImpl implements TaskContext {
private final TaskModel taskModel;
private final MetricsRegistry taskMetricsRegistry;
@@ -39,8 +45,13 @@ public class TaskContextImpl implements TaskContext {
private final TableManager tableManager;
private final CallbackScheduler callbackScheduler;
private final OffsetManager offsetManager;
+
+ // The instance variables below are not used for implementing any public API
methods. They are here so that we can
+ // pass some internal components over to the implementation of the
high-level API. See InternalTaskContext.
+
private final JobModel jobModel;
private final StreamMetadataCache streamMetadataCache;
+ private final Set<SystemStreamPartition> sspsExcludingSideInputs;
public TaskContextImpl(TaskModel taskModel,
MetricsRegistry taskMetricsRegistry,
@@ -49,7 +60,8 @@ public class TaskContextImpl implements TaskContext {
CallbackScheduler callbackScheduler,
OffsetManager offsetManager,
JobModel jobModel,
- StreamMetadataCache streamMetadataCache) {
+ StreamMetadataCache streamMetadataCache,
+ Set<SystemStreamPartition> sspsExcludingSideInputs) {
this.taskModel = taskModel;
this.taskMetricsRegistry = taskMetricsRegistry;
this.keyValueStoreProvider = keyValueStoreProvider;
@@ -58,6 +70,7 @@ public class TaskContextImpl implements TaskContext {
this.offsetManager = offsetManager;
this.jobModel = jobModel;
this.streamMetadataCache = streamMetadataCache;
+ this.sspsExcludingSideInputs = sspsExcludingSideInputs;
}
@Override
@@ -101,4 +114,14 @@ public class TaskContextImpl implements TaskContext {
public StreamMetadataCache getStreamMetadataCache() {
return this.streamMetadataCache;
}
-}
+
+ /**
+ * Returns the {@link SystemStreamPartition}s excluding the side-input SSPs.
For the high-level API, watermarks and
+ * end-of-stream messages are propagated based on their input SSPs. However,
the Samza framework does not give side
+ * input messages to the high-level operator tasks. Therefore, the operators
need to know the input SSPs excluding the
+ * side input SSPs. See SAMZA-2303 for more details.
+ */
+ public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
+ return this.sspsExcludingSideInputs;
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 705f0cb..19cec80 100644
---
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -111,17 +111,13 @@ public class OperatorImplGraph {
LOG.info("{} has {} producer tasks.", stream, count);
});
- // set states for end-of-stream
+ // set states for end-of-stream; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(EndOfStreamStates.class.getName(),
- new EndOfStreamStates(
-
internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
- producerTaskCounts));
- // set states for watermark
+ new
EndOfStreamStates(internalTaskContext.getSspsExcludingSideInputs(),
producerTaskCounts));
+ // set states for watermark; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(WatermarkStates.class.getName(),
- new WatermarkStates(
-
internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
- producerTaskCounts,
- context.getContainerContext().getContainerMetricsRegistry()));
+ new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(),
producerTaskCounts,
+ context.getContainerContext().getContainerMetricsRegistry()));
specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
SystemStream systemStream =
streamConfig.streamIdToSystemStream(streamId);
diff --git
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index 13ebf6e..4d44a86 100644
---
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -68,7 +68,7 @@ class InMemoryManager {
List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
String offset = String.valueOf(messages.size());
- if (message instanceof EndOfStreamMessage) {
+ if (shouldUseEndOfStreamOffset(message)) {
offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
}
@@ -224,4 +224,22 @@ class InMemoryManager {
return ImmutableList.copyOf(messageEnvelopesForSSP.subList(startingOffset,
messageEnvelopesForSSP.size()));
}
+
+ /**
+ * We don't always want to use {@link
IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for all
+ * {@link EndOfStreamMessage}s. Certain control message flows (e.g.
end-of-stream) have an aggregation partition,
+ * which needs to listen for messages from all other partitions. These
aggregation messages are marked by the task
+ * name being non-null. If we use {@link
IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for the aggregation messages,
+ * then the aggregation partition would stop listening once it got the
message from one of the tasks, but that means
+ * it would miss the aggregation messages from all other tasks. See
SAMZA-2300 for more details.
+ * One other note: If there is a serializer set for the stream, then by the
time the message gets to this check, it
+ * will be a byte array, so this check will not return true, even if the
deserialized message was an
+ * {@link EndOfStreamMessage}. So far this isn't a problem, because we only
really need this to return true for
+ * input streams (not intermediate streams), and in-memory input stream data
doesn't get serialized. For intermediate
+ * streams, we don't need END_OF_STREAM_OFFSET to be used since the
high-level operators take care of end-of-stream
+ * messages based on MessageType.
+ */
+ private static boolean shouldUseEndOfStreamOffset(Object message) {
+ return (message instanceof EndOfStreamMessage) && ((EndOfStreamMessage)
message).getTaskName() == null;
+ }
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 2ebe465..801c6bc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -80,7 +80,7 @@ class TaskInstance(
}
})
private val taskContext = new TaskContextImpl(taskModel, metrics.registry,
kvStoreSupplier, tableManager,
- new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel,
streamMetadataCache)
+ new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel,
streamMetadataCache, systemStreamPartitions)
// need separate field for this instead of using it through Context, since
Context throws an exception if it is null
private val applicationTaskContextOption =
applicationTaskContextFactoryOption
.map(_.create(externalContextOption.orNull, jobContext, containerContext,
taskContext,
diff --git
a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
index 0e8f78e..094583e 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
@@ -62,7 +62,7 @@ public class TestTaskContextImpl {
MockitoAnnotations.initMocks(this);
taskContext =
new TaskContextImpl(taskModel, taskMetricsRegistry,
keyValueStoreProvider, tableManager, callbackScheduler,
- offsetManager, null, null);
+ offsetManager, null, null, null);
when(this.taskModel.getTaskName()).thenReturn(TASK_NAME);
}
diff --git
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 76b79a7..8218720 100644
---
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -36,6 +36,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.Context;
import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
import org.apache.samza.job.model.TaskModel;
@@ -93,11 +94,13 @@ public class TestWindowOperator {
Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ SystemStreamPartition ssp = new SystemStreamPartition("kafka", "integers",
new Partition(0));
TaskModel taskModel = mock(TaskModel.class);
- when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
- .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+
when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet.of(ssp));
when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+ when(((TaskContextImpl)
this.context.getTaskContext()).getSspsExcludingSideInputs()).thenReturn(
+ ImmutableSet.of(ssp));
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new
MetricsRegistryMap());
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new
MetricsRegistryMap());
when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
diff --git
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
index 0a2e221..9439b01 100644
---
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
+++
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
@@ -127,7 +127,7 @@ public class TestInMemorySystem {
}
@Test
- public void testEndOfStreamMessage() {
+ public void testEndOfStreamMessageWithTask() {
EndOfStreamMessage eos = new EndOfStreamMessage("test-task");
produceMessages(eos);
@@ -139,6 +139,24 @@ public class TestInMemorySystem {
List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
assertEquals(1, results.size());
+ assertEquals("test-task", ((EndOfStreamMessage)
results.get(0).getMessage()).getTaskName());
+ assertFalse(results.get(0).isEndOfStream());
+ }
+
+ @Test
+ public void testEndOfStreamMessageWithoutTask() {
+ EndOfStreamMessage eos = new EndOfStreamMessage();
+
+ produceMessages(eos);
+
+ Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+ .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new
Partition(partition)))
+ .collect(Collectors.toSet());
+
+ List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
+
+ assertEquals(1, results.size());
+ assertNull(((EndOfStreamMessage)
results.get(0).getMessage()).getTaskName());
assertTrue(results.get(0).isEndOfStream());
}
diff --git
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index fb65eea..943a8ca 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -107,6 +107,14 @@ public class TestRunner {
this.configs = new HashMap<>();
this.inMemoryScope = RandomStringUtils.random(10, true, true);
configs.put(ApplicationConfig.APP_NAME, APP_NAME);
+ /*
+ * Use a unique app id to help make sure a test execution is isolated from
others.
+ * A concrete example of where this helps is to avoid an issue with
ControlMessageSender. It has a static cache
+ * keyed by stream id to save partition counts for intermediate streams.
This means that different tests can
+ * collide in this cache if they use the same intermediate stream names.
Having a unique app id makes the
+ * intermediate streams unique across tests.
+ */
+ configs.put(ApplicationConfig.APP_ID, this.inMemoryScope);
configs.put(JobConfig.PROCESSOR_ID, "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName());
configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY,
InMemoryMetadataStoreFactory.class.getCanonicalName());
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index 78fc7b5..ea81f13 100644
---
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -19,6 +19,7 @@
package org.apache.samza.test.table;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -27,10 +28,8 @@ import java.util.Map;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.context.Context;
@@ -39,7 +38,6 @@ import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
@@ -47,11 +45,10 @@ import
org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import
org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.framework.TestRunner;
+import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
-
-import org.junit.Assert;
import org.junit.Test;
import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
@@ -62,50 +59,42 @@ import static
org.apache.samza.test.table.TestTableData.Profile;
import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
/**
* This test class tests sendTo() and join() for local tables
*/
-public class TestLocalTableEndToEnd extends IntegrationTestHarness {
+public class TestLocalTableEndToEnd {
+ private static final String SYSTEM_NAME = "test";
+ private static final String PAGEVIEW_STREAM = "pageview";
+ private static final String PROFILE_STREAM = "profile";
@Test
- public void testSendTo() throws Exception {
-
- int count = 10;
- Profile[] profiles = TestTableData.generateProfiles(count);
-
- int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(),
zkConnect());
-
- configs.put("streams.Profile.samza.system", "test");
- configs.put("streams.Profile.source",
Base64Serializer.serialize(profiles));
- configs.put("streams.Profile.partitionCount",
String.valueOf(partitionCount));
-
+ public void testSendTo() {
MyMapFunction mapFn = new MyMapFunction();
+ StreamApplication app = appDesc -> {
+ Table<KV<Integer, Profile>> table =
+ appDesc.getTable(new InMemoryTableDescriptor<>("t1", KVSerde.of(new
IntegerSerde(), new ProfileJsonSerde())));
+ DelegatingSystemDescriptor ksd = new
DelegatingSystemDescriptor(SYSTEM_NAME);
+ GenericInputDescriptor<Profile> isd =
ksd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
- final StreamApplication app = appDesc -> {
-
- Table<KV<Integer, Profile>> table = appDesc.getTable(new
InMemoryTableDescriptor("t1",
- KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile",
new NoOpSerde<>());
-
- appDesc.getInputStream(isd)
- .map(mapFn)
- .sendTo(table);
+ appDesc.getInputStream(isd).map(mapFn).sendTo(table);
};
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(app,
config);
- executeRun(runner, config);
- runner.waitForFinish();
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+ InMemoryInputDescriptor<Profile> profileStreamDesc =
isd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
+
+ int numProfilesPerPartition = 10;
+ int numInputPartitions = 4;
+ Map<Integer, List<Profile>> inputProfiles =
+ TestTableData.generatePartitionedProfiles(numProfilesPerPartition *
numInputPartitions, numInputPartitions);
+ TestRunner.of(app).addInputStream(profileStreamDesc,
inputProfiles).run(Duration.ofSeconds(10));
- for (int i = 0; i < partitionCount; i++) {
+ for (int i = 0; i < numInputPartitions; i++) {
MyMapFunction mapFnCopy =
MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
- assertEquals(count, mapFnCopy.received.size());
- mapFnCopy.received.forEach(p ->
Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+ assertEquals(numProfilesPerPartition, mapFnCopy.received.size());
+ mapFnCopy.received.forEach(p ->
assertNotNull(mapFnCopy.table.get(p.getMemberId())));
}
}
@@ -116,52 +105,49 @@ public class TestLocalTableEndToEnd extends
IntegrationTestHarness {
@Override
public void describe(StreamApplicationDescriptor appDesc) {
Table<KV<Integer, Profile>> table = appDesc.getTable(
- new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new
ProfileJsonSerde())));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> profileISD =
ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+ new InMemoryTableDescriptor<>("t1", KVSerde.of(new IntegerSerde(),
new ProfileJsonSerde())));
+ DelegatingSystemDescriptor ksd = new
DelegatingSystemDescriptor(SYSTEM_NAME);
+ GenericInputDescriptor<Profile> profileISD =
ksd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
+ profileISD.shouldBootstrap();
appDesc.getInputStream(profileISD)
- .map(m -> new KV(m.getMemberId(), m))
+ .map(m -> new KV<>(m.getMemberId(), m))
.sendTo(table);
- GenericInputDescriptor<PageView> pageViewISD =
ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ GenericInputDescriptor<PageView> pageViewISD =
ksd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
appDesc.getInputStream(pageViewISD)
.map(pv -> {
received.add(pv);
return pv;
})
- .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new
NoOpSerde<>(), new NoOpSerde<>()), "p1")
+ .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new
IntegerSerde(), new PageViewJsonSerde()), "p1")
.join(table, new PageViewToProfileJoinFunction())
.sink((m, collector, coordinator) -> joined.add(m));
}
}
@Test
- public void testStreamTableJoin() throws Exception {
-
- int count = 10;
- PageView[] pageViews = TestTableData.generatePageViews(count);
- Profile[] profiles = TestTableData.generateProfiles(count);
-
+ public void testStreamTableJoin() {
+ int totalPageViews = 40;
int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(),
zkConnect());
-
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source",
Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView.partitionCount",
String.valueOf(partitionCount));
-
- configs.put("streams.Profile.samza.system", "test");
- configs.put("streams.Profile.samza.bootstrap", "true");
- configs.put("streams.Profile.source",
Base64Serializer.serialize(profiles));
- configs.put("streams.Profile.partitionCount",
String.valueOf(partitionCount));
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new
StreamTableJoinApp(), config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
- assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
- assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
+ Map<Integer, List<PageView>> inputPageViews =
+ TestTableData.generatePartitionedPageViews(totalPageViews,
partitionCount);
+ // 10 is the max member id for page views
+ Map<Integer, List<Profile>> inputProfiles =
+ TestTableData.generatePartitionedProfiles(10, partitionCount);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+ InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
+ .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
+ InMemoryInputDescriptor<Profile> profileStreamDesc = isd
+ .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
+
+ TestRunner.of(new StreamTableJoinApp())
+ .addInputStream(pageViewStreamDesc, inputPageViews)
+ .addInputStream(profileStreamDesc, inputProfiles)
+ .run(Duration.ofSeconds(10));
+
+ assertEquals(totalPageViews, StreamTableJoinApp.received.size());
+ assertEquals(totalPageViews, StreamTableJoinApp.joined.size());
+ assertNotNull(StreamTableJoinApp.joined.get(0));
}
static class DualStreamTableJoinApp implements StreamApplication {
@@ -178,29 +164,31 @@ public class TestLocalTableEndToEnd extends
IntegrationTestHarness {
PageViewToProfileJoinFunction joinFn1 = new
PageViewToProfileJoinFunction();
PageViewToProfileJoinFunction joinFn2 = new
PageViewToProfileJoinFunction();
- Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new
InMemoryTableDescriptor("t1", profileKVSerde));
+ Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new
InMemoryTableDescriptor<>("t1", profileKVSerde));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> profileISD1 =
ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
- GenericInputDescriptor<Profile> profileISD2 =
ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+ DelegatingSystemDescriptor ksd = new
DelegatingSystemDescriptor(SYSTEM_NAME);
+ GenericInputDescriptor<Profile> profileISD1 =
ksd.getInputDescriptor(PROFILE_STREAM + "1", new NoOpSerde<>());
+ profileISD1.shouldBootstrap();
+ GenericInputDescriptor<Profile> profileISD2 =
ksd.getInputDescriptor(PROFILE_STREAM + "2", new NoOpSerde<>());
+ profileISD2.shouldBootstrap();
MessageStream<Profile> profileStream1 =
appDesc.getInputStream(profileISD1);
MessageStream<Profile> profileStream2 =
appDesc.getInputStream(profileISD2);
profileStream1
.map(m -> {
sentToProfileTable1.add(m);
- return new KV(m.getMemberId(), m);
+ return new KV<>(m.getMemberId(), m);
})
.sendTo(profileTable);
profileStream2
.map(m -> {
sentToProfileTable2.add(m);
- return new KV(m.getMemberId(), m);
+ return new KV<>(m.getMemberId(), m);
})
.sendTo(profileTable);
- GenericInputDescriptor<PageView> pageViewISD1 =
ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
- GenericInputDescriptor<PageView> pageViewISD2 =
ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+ GenericInputDescriptor<PageView> pageViewISD1 =
ksd.getInputDescriptor(PAGEVIEW_STREAM + "1", new NoOpSerde<>());
+ GenericInputDescriptor<PageView> pageViewISD2 =
ksd.getInputDescriptor(PAGEVIEW_STREAM + "2", new NoOpSerde<>());
MessageStream<PageView> pageViewStream1 =
appDesc.getInputStream(pageViewISD1);
MessageStream<PageView> pageViewStream2 =
appDesc.getInputStream(pageViewISD2);
@@ -217,45 +205,40 @@ public class TestLocalTableEndToEnd extends
IntegrationTestHarness {
}
@Test
- public void testDualStreamTableJoin() throws Exception {
-
- int count = 10;
- PageView[] pageViews = TestTableData.generatePageViews(count);
- Profile[] profiles = TestTableData.generateProfiles(count);
-
+ public void testDualStreamTableJoin() {
+ int totalPageViews = 40;
int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(),
zkConnect());
-
- configs.put("streams.Profile1.samza.system", "test");
- configs.put("streams.Profile1.source",
Base64Serializer.serialize(profiles));
- configs.put("streams.Profile1.samza.bootstrap", "true");
- configs.put("streams.Profile1.partitionCount",
String.valueOf(partitionCount));
-
- configs.put("streams.Profile2.samza.system", "test");
- configs.put("streams.Profile2.source",
Base64Serializer.serialize(profiles));
- configs.put("streams.Profile2.samza.bootstrap", "true");
- configs.put("streams.Profile2.partitionCount",
String.valueOf(partitionCount));
-
- configs.put("streams.PageView1.samza.system", "test");
- configs.put("streams.PageView1.source",
Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView1.partitionCount",
String.valueOf(partitionCount));
-
- configs.put("streams.PageView2.samza.system", "test");
- configs.put("streams.PageView2.source",
Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView2.partitionCount",
String.valueOf(partitionCount));
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new
DualStreamTableJoinApp(), config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- assertEquals(count * partitionCount,
DualStreamTableJoinApp.sentToProfileTable1.size());
- assertEquals(count * partitionCount,
DualStreamTableJoinApp.sentToProfileTable2.size());
-
- assertEquals(count * partitionCount,
DualStreamTableJoinApp.joinedPageViews1.size());
- assertEquals(count * partitionCount,
DualStreamTableJoinApp.joinedPageViews2.size());
- assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof
EnrichedPageView);
- assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof
EnrichedPageView);
+ Map<Integer, List<PageView>> inputPageViews1 =
+ TestTableData.generatePartitionedPageViews(totalPageViews,
partitionCount);
+ Map<Integer, List<PageView>> inputPageViews2 =
+ TestTableData.generatePartitionedPageViews(totalPageViews,
partitionCount);
+ // 10 is the max member id for page views
+ int numProfiles = 10;
+ Map<Integer, List<Profile>> inputProfiles1 =
TestTableData.generatePartitionedProfiles(numProfiles, partitionCount);
+ Map<Integer, List<Profile>> inputProfiles2 =
TestTableData.generatePartitionedProfiles(numProfiles, partitionCount);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+ InMemoryInputDescriptor<PageView> pageViewStreamDesc1 = isd
+ .getInputDescriptor(PAGEVIEW_STREAM + "1", new NoOpSerde<>());
+ InMemoryInputDescriptor<PageView> pageViewStreamDesc2 = isd
+ .getInputDescriptor(PAGEVIEW_STREAM + "2", new NoOpSerde<>());
+ InMemoryInputDescriptor<Profile> profileStreamDesc1 = isd
+ .getInputDescriptor(PROFILE_STREAM + "1", new NoOpSerde<>());
+ InMemoryInputDescriptor<Profile> profileStreamDesc2 = isd
+ .getInputDescriptor(PROFILE_STREAM + "2", new NoOpSerde<>());
+
+ TestRunner.of(new DualStreamTableJoinApp())
+ .addInputStream(pageViewStreamDesc1, inputPageViews1)
+ .addInputStream(pageViewStreamDesc2, inputPageViews2)
+ .addInputStream(profileStreamDesc1, inputProfiles1)
+ .addInputStream(profileStreamDesc2, inputProfiles2)
+ .run(Duration.ofSeconds(10));
+
+ assertEquals(numProfiles,
DualStreamTableJoinApp.sentToProfileTable1.size());
+ assertEquals(numProfiles,
DualStreamTableJoinApp.sentToProfileTable2.size());
+ assertEquals(totalPageViews,
DualStreamTableJoinApp.joinedPageViews1.size());
+ assertEquals(totalPageViews,
DualStreamTableJoinApp.joinedPageViews2.size());
+ assertNotNull(DualStreamTableJoinApp.joinedPageViews1.get(0));
+ assertNotNull(DualStreamTableJoinApp.joinedPageViews2.get(0));
}
static Map<String, String> getBaseJobConfig(String bootstrapUrl, String
zkConnect) {
@@ -283,8 +266,7 @@ public class TestLocalTableEndToEnd extends
IntegrationTestHarness {
}
private static class MyMapFunction implements MapFunction<Profile,
KV<Integer, Profile>> {
-
- private static Map<String, MyMapFunction> taskToMapFunctionMap = new
HashMap<>();
+ private static final Map<String, MyMapFunction> TASK_TO_MAP_FUNCTION_MAP =
new HashMap<>();
private transient List<Profile> received;
private transient ReadWriteTable table;
@@ -294,17 +276,17 @@ public class TestLocalTableEndToEnd extends
IntegrationTestHarness {
table = context.getTaskContext().getTable("t1");
this.received = new ArrayList<>();
-
taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(),
this);
+
TASK_TO_MAP_FUNCTION_MAP.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(),
this);
}
@Override
public KV<Integer, Profile> apply(Profile profile) {
received.add(profile);
- return new KV(profile.getMemberId(), profile);
+ return new KV<>(profile.getMemberId(), profile);
}
public static MyMapFunction getMapFunctionByTask(String taskName) {
- return taskToMapFunctionMap.get(taskName);
+ return TASK_TO_MAP_FUNCTION_MAP.get(taskName);
}
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
index 071f65e..34ac29a 100644
---
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
+++
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
@@ -19,128 +19,190 @@
package org.apache.samza.test.table;
-import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
-import org.apache.samza.SamzaException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import
org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.framework.StreamAssert;
import org.apache.samza.test.framework.TestRunner;
import
org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
import
org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
import
org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.apache.samza.test.harness.IntegrationTestHarness;
import org.junit.Test;
import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
import static org.apache.samza.test.table.TestTableData.PageView;
import static org.apache.samza.test.table.TestTableData.Profile;
import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-public class TestLocalTableWithSideInputsEndToEnd extends
IntegrationTestHarness {
+public class TestLocalTableWithSideInputsEndToEnd {
+ private static final String SYSTEM_NAME = "test";
private static final String PAGEVIEW_STREAM = "pageview";
private static final String PROFILE_STREAM = "profile";
+ private static final String PROFILE_TABLE = "profile-table";
private static final String ENRICHED_PAGEVIEW_STREAM = "enrichedpageview";
+ private static final SystemStream OUTPUT_SYSTEM_STREAM = new
SystemStream(SYSTEM_NAME, ENRICHED_PAGEVIEW_STREAM);
@Test
- public void testJoinWithSideInputsTable() {
+ public void testLowLevelJoinWithSideInputsTable() throws
InterruptedException {
+ int partitionCount = 4;
+ IntegerSerde integerSerde = new IntegerSerde();
+ // for low-level, need to pre-partition the input in the same way that the
profiles are partitioned
+ Map<Integer, List<PageView>> pageViewsPartitionedByMemberId =
+ TestTableData.generatePartitionedPageViews(20, partitionCount)
+ .values()
+ .stream()
+ .flatMap(List::stream)
+ .collect(Collectors.groupingBy(
+ pageView ->
Math.abs(Arrays.hashCode(integerSerde.toBytes(pageView.getMemberId()))) %
partitionCount));
+ runTest(
+ new LowLevelPageViewProfileJoin(),
+ pageViewsPartitionedByMemberId,
+ TestTableData.generatePartitionedProfiles(10, partitionCount));
+ }
+
+ @Test
+ public void testJoinWithSideInputsTable() throws InterruptedException {
runTest(
- "test",
new PageViewProfileJoin(),
- Arrays.asList(TestTableData.generatePageViews(10)),
- Arrays.asList(TestTableData.generateProfiles(10)));
+ TestTableData.generatePartitionedPageViews(20, 4),
+ TestTableData.generatePartitionedProfiles(10, 4));
}
@Test
- public void testJoinWithDurableSideInputTable() {
+ public void testJoinWithDurableSideInputTable() throws InterruptedException {
runTest(
- "test",
new DurablePageViewProfileJoin(),
- Arrays.asList(TestTableData.generatePageViews(5)),
- Arrays.asList(TestTableData.generateProfiles(5)));
+ TestTableData.generatePartitionedPageViews(20, 4),
+ TestTableData.generatePartitionedProfiles(10, 4));
}
- private void runTest(String systemName, StreamApplication app,
List<PageView> pageViews,
- List<Profile> profiles) {
- Map<String, String> configs = new HashMap<>();
- configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID,
PAGEVIEW_STREAM), systemName);
- configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID,
PROFILE_STREAM), systemName);
- configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID,
ENRICHED_PAGEVIEW_STREAM), systemName);
-
- InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
-
+ private <T extends ApplicationDescriptor<?>> void
runTest(SamzaApplication<T> app,
+ Map<Integer, List<PageView>> pageViews,
+ Map<Integer, List<Profile>> profiles) throws InterruptedException {
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
- .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>());
-
+ .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
InMemoryInputDescriptor<Profile> profileStreamDesc = isd
- .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>());
-
+ .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd
- .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new
NoOpSerde<EnrichedPageView>());
+ .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>());
- TestRunner
- .of(app)
+ TestRunner.of(app)
.addInputStream(pageViewStreamDesc, pageViews)
.addInputStream(profileStreamDesc, profiles)
.addOutputStream(outputStreamDesc, 1)
- .addConfig(new MapConfig(configs))
- .run(Duration.ofMillis(100000));
-
- try {
- Map<Integer, List<EnrichedPageView>> result =
TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
- List<EnrichedPageView> results = result.values().stream()
- .flatMap(List::stream)
- .collect(Collectors.toList());
-
- List<EnrichedPageView> expectedEnrichedPageviews = pageViews.stream()
- .flatMap(pv -> profiles.stream()
- .filter(profile -> pv.memberId == profile.memberId)
- .map(profile -> new EnrichedPageView(pv.pageKey,
profile.memberId, profile.company)))
- .collect(Collectors.toList());
-
- boolean successfulJoin =
results.stream().allMatch(expectedEnrichedPageviews::contains);
- assertEquals("Mismatch between the expected and actual join count",
expectedEnrichedPageviews.size(), results.size());
- assertTrue("Pageview profile join did not succeed for all inputs",
successfulJoin);
- } catch (SamzaException e) {
- e.printStackTrace();
+ .run(Duration.ofSeconds(10));
+
+ List<EnrichedPageView> expectedEnrichedPageViews =
buildExpectedEnrichedPageViews(pageViews, profiles);
+ StreamAssert.containsInAnyOrder(expectedEnrichedPageViews,
outputStreamDesc, Duration.ofSeconds(1));
+ }
+
+ private static List<EnrichedPageView>
buildExpectedEnrichedPageViews(Map<Integer, List<PageView>> pageViews,
+ Map<Integer, List<Profile>> profiles) {
+ ImmutableMap.Builder<Integer, Profile> profilesByMemberIdBuilder = new
ImmutableMap.Builder<>();
+ profiles.values()
+ .stream()
+ .flatMap(List::stream)
+ .forEach(profile ->
profilesByMemberIdBuilder.put(profile.getMemberId(), profile));
+ Map<Integer, Profile> profilesByMemberId =
profilesByMemberIdBuilder.build();
+ ImmutableList.Builder<EnrichedPageView> enrichedPageViewsBuilder = new
ImmutableList.Builder<>();
+ pageViews.values()
+ .stream()
+ .flatMap(List::stream)
+ .forEach(pageView ->
Optional.ofNullable(profilesByMemberId.get(pageView.getMemberId()))
+ .ifPresent(profile -> enrichedPageViewsBuilder.add(
+ new EnrichedPageView(pageView.getPageKey(),
profile.getMemberId(), profile.getCompany()))));
+ return enrichedPageViewsBuilder.build();
+ }
+
+ static class LowLevelPageViewProfileJoin implements TaskApplication {
+ @Override
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ DelegatingSystemDescriptor sd = new
DelegatingSystemDescriptor(SYSTEM_NAME);
+ appDescriptor.withInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new
NoOpSerde<>()));
+ appDescriptor.withInputStream(sd.getInputDescriptor(PROFILE_STREAM, new
NoOpSerde<>()));
+
+ TableDescriptor<Integer, Profile, ?> tableDescriptor = new
InMemoryTableDescriptor<>(PROFILE_TABLE,
+ KVSerde.of(new IntegerSerde(), new
ProfileJsonSerde())).withSideInputs(ImmutableList.of(PROFILE_STREAM))
+ .withSideInputsProcessor((msg, store) -> {
+ Profile profile = (Profile) msg.getMessage();
+ int key = profile.getMemberId();
+ return ImmutableList.of(new Entry<>(key, profile));
+ });
+ appDescriptor.withTable(tableDescriptor);
+
+
appDescriptor.withOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM,
new NoOpSerde<>()));
+
+ appDescriptor.withTaskFactory((StreamTaskFactory)
PageViewProfileJoinStreamTask::new);
}
}
- static class PageViewProfileJoin implements StreamApplication {
- static final String PROFILE_TABLE = "profile-table";
+ static class PageViewProfileJoinStreamTask implements InitableTask,
StreamTask {
+ private ReadWriteTable<Integer, Profile> profileTable;
@Override
+ public void init(Context context) {
+ this.profileTable = context.getTaskContext().getTable(PROFILE_TABLE);
+ }
+
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) {
+ PageView pageView = (PageView) envelope.getMessage();
+ Profile profile = this.profileTable.get(pageView.getMemberId());
+ if (profile != null) {
+ EnrichedPageView enrichedPageView =
+ new EnrichedPageView(pageView.getPageKey(), profile.getMemberId(),
profile.getCompany());
+ collector.send(new OutgoingMessageEnvelope(OUTPUT_SYSTEM_STREAM,
enrichedPageView));
+ }
+ }
+ }
+
+ static class PageViewProfileJoin implements StreamApplication {
+ @Override
public void describe(StreamApplicationDescriptor appDescriptor) {
Table<KV<Integer, TestTableData.Profile>> table =
appDescriptor.getTable(getTableDescriptor());
- KafkaSystemDescriptor sd =
- new KafkaSystemDescriptor("test");
+ DelegatingSystemDescriptor sd = new
DelegatingSystemDescriptor(SYSTEM_NAME);
appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new
NoOpSerde<TestTableData.PageView>()))
- .partitionBy(TestTableData.PageView::getMemberId, v -> v,
KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
+ .partitionBy(TestTableData.PageView::getMemberId, v -> v,
+ KVSerde.of(new IntegerSerde(), new
TestTableData.PageViewJsonSerde()), "partition-page-view")
.join(table, new PageViewToProfileJoinFunction())
.sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM,
new NoOpSerde<>())));
}
protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
- return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new
IntegerSerde(), new ProfileJsonSerde()))
+ return new InMemoryTableDescriptor<>(PROFILE_TABLE, KVSerde.of(new
IntegerSerde(), new ProfileJsonSerde()))
.withSideInputs(ImmutableList.of(PROFILE_STREAM))
.withSideInputsProcessor((msg, store) -> {
Profile profile = (Profile) msg.getMessage();
@@ -153,7 +215,7 @@ public class TestLocalTableWithSideInputsEndToEnd extends
IntegrationTestHarness
static class DurablePageViewProfileJoin extends PageViewProfileJoin {
@Override
protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
- return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new
IntegerSerde(), new ProfileJsonSerde()))
+ return new RocksDbTableDescriptor<>(PROFILE_TABLE, KVSerde.of(new
IntegerSerde(), new ProfileJsonSerde()))
.withSideInputs(ImmutableList.of(PROFILE_STREAM))
.withSideInputsProcessor((msg, store) -> {
TestTableData.Profile profile = (TestTableData.Profile)
msg.getMessage();
@@ -162,4 +224,4 @@ public class TestLocalTableWithSideInputsEndToEnd extends
IntegrationTestHarness
});
}
}
-}
\ No newline at end of file
+}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
index 76c56b0..39f9b02 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -20,19 +20,29 @@
package org.apache.samza.test.table;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.core.type.TypeReference;
public class TestTableData {
+ private static final IntegerSerde INTEGER_SERDE = new IntegerSerde();
public static class PageView implements Serializable {
@JsonProperty("pageKey")
@@ -205,6 +215,35 @@ public class TestTableData {
return pageviews;
}
+ /**
+ * Create page views and spread out page views with the same member id
across different partitions.
+ * Member ids are spread out like this to make sure that partitionBy
operators properly repartition the messages.
+ * Member ids are assigned randomly from [0, 10).
+ *
+ * Example
+ * generatePartitionedPageViews(20, 4) will return:
+ * 0 -> page views with member ids [0, 5)
+ * 1 -> page views with member ids [6, 10)
+ * 2 -> page views with member ids [0, 5)
+ * 3 -> page views with member ids [6, 10)
+ */
+ public static Map<Integer, List<PageView>> generatePartitionedPageViews(int
numPageViews, int partitionCount) {
+ Preconditions.checkArgument(numPageViews % partitionCount == 0,
"partitionCount must divide numPageViews evenly");
+ int numPerPartition = numPageViews / partitionCount;
+ Random random = new Random();
+ ImmutableMap.Builder<Integer, List<PageView>> pageViewsBuilder = new
ImmutableMap.Builder<>();
+ for (int i = 0; i < partitionCount; i++) {
+ pageViewsBuilder.put(i, new ArrayList<>());
+ }
+ Map<Integer, List<PageView>> pageViews = pageViewsBuilder.build();
+ for (int i = 0; i < numPageViews; i++) {
+ String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+ int memberId = i % 10;
+ pageViews.get(i / numPerPartition).add(new PageView(pagekey, memberId));
+ }
+ return pageViews;
+ }
+
static public PageView[] generatePageViewsWithDistinctKeys(int count) {
Random random = new Random();
PageView[] pageviews = new PageView[count];
@@ -227,4 +266,20 @@ public class TestTableData {
return profiles;
}
+ /**
+ * Create profiles and partition them based on the bytes representation of
the member id. This uses the bytes
+ * representation for partitioning because this needs to use the same
partition function as the InMemorySystemProducer
+ * (which is used in the test framework) so that table joins can be tested.
+ * One profile for each member id in [0, numProfiles) is created.
+ */
+ public static Map<Integer, List<Profile>> generatePartitionedProfiles(int
numProfiles, int partitionCount) {
+ Random random = new Random();
+ return IntStream.range(0, numProfiles)
+ .mapToObj(i -> {
+ String company = COMPANIES[random.nextInt(COMPANIES.length - 1)];
+ return new Profile(i, company);
+ })
+ .collect(Collectors.groupingBy(
+ profile ->
Math.abs(Arrays.hashCode(INTEGER_SERDE.toBytes(profile.getMemberId()))) %
partitionCount));
+ }
}