Repository: samza Updated Branches: refs/heads/master e310c5cd2 -> c1ea1fda7
y - Fixed serde issues during persisted offsets locally - Added unit tests for TaskSideInputStorageManager - Added integration tests for table integration Author: bharathkk <[email protected]> Author: Bharath Kumarasubramanian <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #594 from bharathkk/side-input-tests Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c1ea1fda Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c1ea1fda Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c1ea1fda Branch: refs/heads/master Commit: c1ea1fda7724a6e89872a2640e2a4ec66c94c916 Parents: e310c5c Author: bharathkk <[email protected]> Authored: Fri Aug 3 17:21:33 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Fri Aug 3 17:21:33 2018 -0700 ---------------------------------------------------------------------- .../storage/TaskSideInputStorageManager.java | 38 ++- .../TestTaskSideInputStorageManager.java | 295 +++++++++++++++++++ .../table/TestLocalTableWithSideInputs.java | 161 ++++++++++ .../apache/samza/test/table/TestTableData.java | 22 +- 4 files changed, 506 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c1ea1fda/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java index 66c9106..3982623 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java @@ -19,6 +19,7 @@ package org.apache.samza.storage; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import java.io.File; @@ -38,6 +39,7 @@ import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.container.TaskName; +import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.IncomingMessageEnvelope; @@ -50,6 +52,8 @@ import org.apache.samza.util.Clock; import org.apache.samza.util.FileUtil; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectWriter; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; @@ -63,7 +67,10 @@ public class TaskSideInputStorageManager { private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class); private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS"; private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); // same as changelog delete retention - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper(); + private static final TypeReference<HashMap<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE = + new TypeReference<HashMap<SystemStreamPartition, String>>() { }; + private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE); private final Clock clock; private final Map<String, SideInputsProcessor> storeToProcessor; @@ -185,6 +192,14 @@ public class TaskSideInputStorageManager { } /** + * For unit testing only + */ + @VisibleForTesting + void updateLastProcessedOffset(SystemStreamPartition ssp, String offset) { + lastProcessedOffsets.put(ssp, offset); + } + + /** * Processes the incoming side input message envelope and updates the last processed offset for its SSP. * * @param message incoming message to be processed @@ -221,7 +236,7 @@ public class TaskSideInputStorageManager { FileUtil.rm(storeLocation); } - if (!storeLocation.exists()) { + if (isPersistedStore(storeName) && !storeLocation.exists()) { LOG.info("Creating {} as the store directory for the side input store {}", storePath, storeName); storeLocation.mkdirs(); } @@ -232,7 +247,8 @@ public class TaskSideInputStorageManager { * Writes the offset files for all side input stores one by one. There is one offset file per store. * Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum. */ - private void writeOffsetFiles() { + @VisibleForTesting + void writeOffsetFiles() { storeToSSps.entrySet().stream() .filter(entry -> isPersistedStore(entry.getKey())) // filter out in-memory side input stores .forEach((entry) -> { @@ -242,7 +258,7 @@ public class TaskSideInputStorageManager { .collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get)); try { - String fileContents = OBJECT_MAPPER.writeValueAsString(offsets); + String fileContents = OBJECT_WRITER.writeValueAsString(offsets); File offsetFile = new File(getStoreLocation(storeName), OFFSET_FILE); FileUtil.writeWithChecksum(offsetFile, fileContents); } catch (Exception e) { @@ -257,7 +273,8 @@ public class TaskSideInputStorageManager { * @return a {@link Map} of {@link SystemStreamPartition} to offset in the offset files. */ @SuppressWarnings("unchecked") - private Map<SystemStreamPartition, String> getFileOffsets() { + @VisibleForTesting + Map<SystemStreamPartition, String> getFileOffsets() { LOG.info("Loading initial offsets from the file for side input stores."); Map<SystemStreamPartition, String> fileOffsets = new HashMap<>(); @@ -268,7 +285,7 @@ public class TaskSideInputStorageManager { if (isValidSideInputStore(storeName, storeLocation)) { try { String fileContents = StorageManagerUtil.readOffsetFile(storeLocation, OFFSET_FILE); - Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, Map.class); + Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE); fileOffsets.putAll(offsets); } catch (Exception e) { LOG.warn("Failed to load the offset file for side input store:" + storeName, e); @@ -279,7 +296,8 @@ public class TaskSideInputStorageManager { return fileOffsets; } - private File getStoreLocation(String storeName) { + @VisibleForTesting + File getStoreLocation(String storeName) { return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_')); } @@ -292,7 +310,8 @@ public class TaskSideInputStorageManager { * @param oldestOffsets oldest offsets from the source * @return a {@link Map} of {@link SystemStreamPartition} to offset */ - private Map<SystemStreamPartition, String> getStartingOffsets( + @VisibleForTesting + Map<SystemStreamPartition, String> getStartingOffsets( Map<SystemStreamPartition, String> fileOffsets, Map<SystemStreamPartition, String> oldestOffsets) { Map<SystemStreamPartition, String> startingOffsets = new HashMap<>(); @@ -317,7 +336,8 @@ public class TaskSideInputStorageManager { * * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset. */ - private Map<SystemStreamPartition, String> getOldestOffsets() { + @VisibleForTesting + Map<SystemStreamPartition, String> getOldestOffsets() { Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>(); // Step 1 http://git-wip-us.apache.org/repos/asf/samza/blob/c1ea1fda/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java new file mode 100644 index 0000000..2d60f7b --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import com.google.common.collect.ImmutableSet; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.ScalaJavaUtil; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +public class TestTaskSideInputStorageManager { + private static final String LOGGED_STORE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "logged-store"; + private static final String NON_LOGGED_STORE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "non-logged-store"; + + @Test + public void testInit() { + final String storeName = "test-init-store"; + final String taskName = "test-init-task"; + + TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR) + .addLoggedStore(storeName, ImmutableSet.of()) + .build(); + + initializeSideInputStorageManager(testSideInputStorageManager); + + File storeDir = testSideInputStorageManager.getStoreLocation(storeName); + assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists()); + } + + @Test + public void testFlush() { + final String storeName = "test-flush-store"; + final String taskName = "test-flush-task"; + final SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); + final String offset = "123"; + + TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR) + .addLoggedStore(storeName, ImmutableSet.of(ssp)) + .build(); + Map<String, StorageEngine> stores = new HashMap<>(); + + initializeSideInputStorageManager(testSideInputStorageManager); + testSideInputStorageManager.updateLastProcessedOffset(ssp, offset); + testSideInputStorageManager.flush(); + + for (StorageEngine storageEngine : stores.values()) { + verify(storageEngine).flush(); + } + + verify(testSideInputStorageManager).writeOffsetFiles(); + + File storeDir = testSideInputStorageManager.getStoreLocation(storeName); + assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists()); + + Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets(); + assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp)); + assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset); + } + + @Test + public void testStop() { + final String storeName = "test-stop-store"; + final String taskName = "test-stop-task"; + + TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR) + .addInMemoryStore(storeName, ImmutableSet.of()) + .build(); + + initializeSideInputStorageManager(testSideInputStorageManager); + testSideInputStorageManager.stop(); + + verify(testSideInputStorageManager.getStore(storeName)).stop(); + verify(testSideInputStorageManager).writeOffsetFiles(); + } + + @Test + public void testWriteOffsetFilesForNonPersistedStore() { + final String storeName = "test-write-offset-non-persisted-store"; + final String taskName = "test-write-offset-for-non-persisted-task"; + + TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR) + .addInMemoryStore(storeName, ImmutableSet.of()) + .build(); + + initializeSideInputStorageManager(testSideInputStorageManager); + testSideInputStorageManager.writeOffsetFiles(); // should be no-op + File storeDir = testSideInputStorageManager.getStoreLocation(storeName); + + assertFalse("Store directory: " + storeDir.getPath() + " should not be created for non-persisted store", storeDir.exists()); + } + + @Test + public void testWriteOffsetFilesForPersistedStore() { + final String storeName = "test-write-offset-persisted-store"; + final String storeName2 = "test-write-offset-persisted-store-2"; + + final String taskName = "test-write-offset-for-persisted-task"; + final String offset = "123"; + final SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); + final SystemStreamPartition ssp2 = new SystemStreamPartition("test-system2", "test-stream2", new Partition(0)); + + TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR) + .addLoggedStore(storeName, ImmutableSet.of(ssp)) + .addLoggedStore(storeName2, ImmutableSet.of(ssp2)) + .build(); + + initializeSideInputStorageManager(testSideInputStorageManager); + testSideInputStorageManager.updateLastProcessedOffset(ssp, offset); + testSideInputStorageManager.updateLastProcessedOffset(ssp2, offset); + testSideInputStorageManager.writeOffsetFiles(); + File storeDir = testSideInputStorageManager.getStoreLocation(storeName); + + assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists()); + + Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets(); + + assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp)); + assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset); + + assertTrue("Failed to get offset for ssp: " + ssp2.toString() + " from file.", fileOffsets.containsKey(ssp2)); + assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp2), offset); + } + + @Test + public void testGetFileOffsets() { + final String storeName = "test-get-file-offsets-store"; + final String taskName = "test-get-file-offsets-task"; + final String offset = "123"; + + Set<SystemStreamPartition> ssps = IntStream.range(1, 6) + .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx))) + .collect(Collectors.toSet()); + + TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR) + .addLoggedStore(storeName, ssps) + .build(); + + initializeSideInputStorageManager(testSideInputStorageManager); + ssps.forEach(ssp -> testSideInputStorageManager.updateLastProcessedOffset(ssp, offset)); + testSideInputStorageManager.writeOffsetFiles(); + + Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets(); + + ssps.forEach(ssp -> { + assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp)); + assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset); + }); + } + + @Test + public void testGetStartingOffsets() { + final String storeName = "test-get-starting-offset-store"; + final String taskName = "test-get-starting-offset-task"; + + Set<SystemStreamPartition> ssps = IntStream.range(1, 6) + .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx))) + .collect(Collectors.toSet()); + + + TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR) + .addLoggedStore(storeName, ssps) + .build(); + + initializeSideInputStorageManager(testSideInputStorageManager); + Map<SystemStreamPartition, String> fileOffsets = ssps.stream() + .collect(Collectors.toMap(Function.identity(), ssp -> { + int partitionId = ssp.getPartition().getPartitionId(); + int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId; + return String.valueOf(offset); + })); + + Map<SystemStreamPartition, String> oldestOffsets = ssps.stream() + .collect(Collectors.toMap(Function.identity(), ssp -> { + int partitionId = ssp.getPartition().getPartitionId(); + int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10; + + return String.valueOf(offset); + })); + + doCallRealMethod().when(testSideInputStorageManager).getStartingOffsets(fileOffsets, oldestOffsets); + + Map<SystemStreamPartition, String> startingOffsets = + testSideInputStorageManager.getStartingOffsets(fileOffsets, oldestOffsets); + + assertTrue("Failed to get starting offsets for all ssps", startingOffsets.size() == 5); + } + + private void initializeSideInputStorageManager(TaskSideInputStorageManager testSideInputStorageManager) { + doReturn(new HashMap<>()).when(testSideInputStorageManager).getStartingOffsets(any(), any()); + testSideInputStorageManager.init(); + } + + private static final class MockTaskSideInputStorageManagerBuilder { + private final TaskName taskName; + private final String storeBaseDir; + + private Clock clock = mock(Clock.class); + private Map<String, SideInputsProcessor> storeToProcessor = new HashMap<>(); + private Map<String, StorageEngine> stores = new HashMap<>(); + private Map<String, Set<SystemStreamPartition>> storeToSSps = new HashMap<>(); + private StreamMetadataCache streamMetadataCache = mock(StreamMetadataCache.class); + private SystemAdmins systemAdmins = mock(SystemAdmins.class); + + public MockTaskSideInputStorageManagerBuilder(String taskName, String storeBaseDir) { + this.taskName = new TaskName(taskName); + this.storeBaseDir = storeBaseDir; + + initializeMocks(); + } + + private void initializeMocks() { + SystemAdmin admin = mock(SystemAdmin.class); + doAnswer(invocation -> { + String offset1 = invocation.getArgumentAt(0, String.class); + String offset2 = invocation.getArgumentAt(1, String.class); + + return Long.compare(Long.parseLong(offset1), Long.parseLong(offset2)); + }).when(admin).offsetComparator(any(), any()); + doAnswer(invocation -> { + Map<SystemStreamPartition, String> sspToOffsets = invocation.getArgumentAt(0, Map.class); + + return sspToOffsets.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + entry -> String.valueOf(Long.parseLong(entry.getValue()) + 1))); + }).when(admin).getOffsetsAfter(any()); + doReturn(admin).when(systemAdmins).getSystemAdmin("test-system"); + + doReturn(ScalaJavaUtil.toScalaMap(new HashMap<>())).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean()); + } + + MockTaskSideInputStorageManagerBuilder addInMemoryStore(String storeName, Set<SystemStreamPartition> ssps) { + StorageEngine storageEngine = mock(StorageEngine.class); + when(storageEngine.getStoreProperties()).thenReturn( + new StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(false).build()); + + stores.put(storeName, storageEngine); + storeToProcessor.put(storeName, mock(SideInputsProcessor.class)); + storeToSSps.put(storeName, ssps); + + return this; + } + + MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName, Set<SystemStreamPartition> ssps) { + StorageEngine storageEngine = mock(StorageEngine.class); + when(storageEngine.getStoreProperties()).thenReturn( + new StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(true).build()); + + stores.put(storeName, storageEngine); + storeToProcessor.put(storeName, mock(SideInputsProcessor.class)); + storeToSSps.put(storeName, ssps); + + return this; + } + + TaskSideInputStorageManager build() { + return spy(new TaskSideInputStorageManager(taskName, streamMetadataCache, storeBaseDir, stores, + storeToProcessor, storeToSSps, systemAdmins, mock(Config.class), clock)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/c1ea1fda/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java new file mode 100644 index 0000000..d9016b1 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.table; + +import com.google.common.collect.ImmutableList; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.RocksDbTableDescriptor; +import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor; +import org.apache.samza.table.Table; +import org.apache.samza.test.framework.TestRunner; +import org.apache.samza.test.framework.stream.CollectionStream; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness { + private static final String PAGEVIEW_STREAM = "pageview"; + private static final String PROFILE_STREAM = "profile"; + private static final String ENRICHED_PAGEVIEW_STREAM = "enrichedpageview"; + + @Test + public void testJoinWithSideInputsTable() { + runTest( + "side-input-join", + new PageViewProfileJoin(), + Arrays.asList(TestTableData.generatePageViews(10)), + Arrays.asList(TestTableData.generateProfiles(10))); + } + + @Test + public void testJoinWithDurableSideInputTable() { + runTest( + "durable-side-input", + new DurablePageViewProfileJoin(), + Arrays.asList(TestTableData.generatePageViews(5)), + Arrays.asList(TestTableData.generateProfiles(5))); + } + + private void runTest(String systemName, StreamApplication app, List<TestTableData.PageView> pageViews, + List<TestTableData.Profile> profiles) { + Map<String, String> configs = new HashMap<>(); + configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName); + configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName); + configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName); + configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName); + + CollectionStream<TestTableData.PageView> pageViewStream = + CollectionStream.of(systemName, PAGEVIEW_STREAM, pageViews); + CollectionStream<TestTableData.Profile> profileStream = + CollectionStream.of(systemName, PROFILE_STREAM, profiles); + + CollectionStream<TestTableData.EnrichedPageView> outputStream = + CollectionStream.empty(systemName, ENRICHED_PAGEVIEW_STREAM); + + TestRunner + .of(app) + .addInputStream(pageViewStream) + .addInputStream(profileStream) + .addOutputStream(outputStream) + .addConfigs(new MapConfig(configs)) + .run(Duration.ofMillis(100000)); + + try { + Map<Integer, List<TestTableData.EnrichedPageView>> result = TestRunner.consumeStream(outputStream, Duration.ofMillis(1000)); + List<TestTableData.EnrichedPageView> results = result.values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + + List<TestTableData.EnrichedPageView> expectedEnrichedPageviews = pageViews.stream() + .flatMap(pv -> profiles.stream() + .filter(profile -> pv.memberId == profile.memberId) + .map(profile -> new TestTableData.EnrichedPageView(pv.pageKey, profile.memberId, profile.company))) + .collect(Collectors.toList()); + + boolean successfulJoin = results.stream().allMatch(expectedEnrichedPageviews::contains); + assertEquals("Mismatch between the expected and actual join count", results.size(), + expectedEnrichedPageviews.size()); + assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + static class PageViewProfileJoin implements StreamApplication { + static final String PROFILE_TABLE = "profile-table"; + + @Override + public void init(StreamGraph graph, Config config) { + Table<KV<Integer, TestTableData.Profile>> table = graph.getTable(getTableDescriptor()); + + graph.getInputStream(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()) + .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view") + .join(table, new TestLocalTable.PageViewToProfileJoinFunction()) + .sendTo(graph.getOutputStream(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())); + } + + protected TableDescriptor<Integer, TestTableData.Profile, ?> getTableDescriptor() { + return new InMemoryTableDescriptor<Integer, TestTableData.Profile>(PROFILE_TABLE) + .withSerde(KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())) + .withSideInputs(ImmutableList.of(PROFILE_STREAM)) + .withSideInputsProcessor((msg, store) -> { + TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage(); + int key = profile.getMemberId(); + + return ImmutableList.of(new Entry<>(key, profile)); + }); + } + } + + static class DurablePageViewProfileJoin extends PageViewProfileJoin { + @Override + protected TableDescriptor<Integer, TestTableData.Profile, ?> getTableDescriptor() { + return new RocksDbTableDescriptor<Integer, TestTableData.Profile>(PROFILE_TABLE) + .withSerde(KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())) + .withSideInputs(ImmutableList.of(PROFILE_STREAM)) + .withSideInputsProcessor((msg, store) -> { + TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage(); + int key = profile.getMemberId(); + + return ImmutableList.of(new Entry<>(key, profile)); + }); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c1ea1fda/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java index dfd0d1b..ed73961 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java @@ -20,8 +20,8 @@ package org.apache.samza.test.table; import java.io.Serializable; +import java.util.Objects; import java.util.Random; - import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.serializers.Serde; @@ -112,6 +112,26 @@ public class TestTableData { super(pageKey, memberId); this.company = company; } + + @Override + public int hashCode() { + return Objects.hash(company, memberId, pageKey); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + EnrichedPageView that = (EnrichedPageView) o; + return Objects.equals(company, that.company) && Objects.equals(memberId, that.memberId) && Objects.equals(pageKey, + that.pageKey); + } } public static class PageViewJsonSerdeFactory implements SerdeFactory<PageView> {
