Repository: samza Updated Branches: refs/heads/master 4394b89ef -> 2d6b19953
SAMZA-1826: Fix unit test failure in table tests The way that we verify the join counts in PageViewToProfileJoinFunction across multiple table tests (i.e. TestLocalTable, TestRemoteTable, TestLocalTableWithSideInputs) creates some conflicts in the static counter map and triggers test failure in certain sequence of ordering of tests. Fixing it by requiring explicit name of the join functions in tests and register / verify by unique op names. Author: Yi Pan (Data Infrastructure) <[email protected]> Closes #617 from nickpan47/SAMZA-1826 and squashes the following commits: a25c484d [Yi Pan (Data Infrastructure)] SAMZA-1826: removing assertion on internal state of MapFunction in integration tests 566f13c5 [Yi Pan (Data Infrastructure)] SAMZA-1826: Fix unit test failure in table tests Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2d6b1995 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2d6b1995 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2d6b1995 Branch: refs/heads/master Commit: 2d6b19953a3660f7e2aed3d59b98b1c161333a40 Parents: 4394b89 Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Sun Aug 26 18:51:12 2018 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Sun Aug 26 18:51:12 2018 -0700 ---------------------------------------------------------------------- .../table/PageViewToProfileJoinFunction.java | 48 ++++++++++++++++++++ .../apache/samza/test/table/TestLocalTable.java | 47 +------------------ .../table/TestLocalTableWithSideInputs.java | 38 ++++++++-------- .../samza/test/table/TestRemoteTable.java | 37 +++++++-------- 4 files changed, 89 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2d6b1995/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java new file mode 100644 index 0000000..d253284 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.table; + +import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.test.table.TestTableData.EnrichedPageView; +import org.apache.samza.test.table.TestTableData.PageView; +import org.apache.samza.test.table.TestTableData.Profile; + +/** + * A {@link StreamTableJoinFunction} used by unit tests in this package + */ +class PageViewToProfileJoinFunction implements StreamTableJoinFunction + <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> { + + @Override + public TestTableData.EnrichedPageView apply(KV<Integer, TestTableData.PageView> m, KV<Integer, TestTableData.Profile> r) { + return r == null ? null : new TestTableData.EnrichedPageView(m.getValue().getPageKey(), m.getKey(), r.getValue().getCompany()); + } + + @Override + public Integer getMessageKey(KV<Integer, TestTableData.PageView> message) { + return message.getKey(); + } + + @Override + public Integer getRecordKey(KV<Integer, TestTableData.Profile> record) { + return record.getKey(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/2d6b1995/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java index 14ef751..e5775f0 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java @@ -25,8 +25,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; - -import java.util.concurrent.atomic.AtomicInteger; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -42,7 +40,6 @@ import org.apache.samza.metrics.Timer; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; @@ -57,17 +54,13 @@ import org.apache.samza.table.ReadableTable; import org.apache.samza.table.Table; import org.apache.samza.task.TaskContext; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; -import org.apache.samza.test.table.TestTableData.EnrichedPageView; -import org.apache.samza.test.table.TestTableData.PageView; -import org.apache.samza.test.table.TestTableData.PageViewJsonSerde; -import org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory; -import org.apache.samza.test.table.TestTableData.Profile; -import org.apache.samza.test.table.TestTableData.ProfileJsonSerde; import org.apache.samza.test.util.ArraySystemFactory; import org.apache.samza.test.util.Base64Serializer; import org.junit.Assert; import org.junit.Test; +import static org.apache.samza.test.table.TestTableData.*; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -250,9 +243,6 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { assertEquals(count * partitionCount, sentToProfileTable1.size()); assertEquals(count * partitionCount, sentToProfileTable2.size()); - for (int i = 0; i < PageViewToProfileJoinFunction.seqNo; i++) { - assertEquals(count * partitionCount, PageViewToProfileJoinFunction.counterPerJoinFn.get(i).intValue()); - } assertEquals(count * partitionCount, joinedPageViews1.size()); assertEquals(count * partitionCount, joinedPageViews2.size()); assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView); @@ -345,39 +335,6 @@ public class TestLocalTable extends AbstractIntegrationTestHarness { } } - static class PageViewToProfileJoinFunction implements StreamTableJoinFunction - <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> { - private static Map<Integer, AtomicInteger> counterPerJoinFn = new HashMap<>(); - private static int seqNo = 0; - private final int currentSeqNo; - - public PageViewToProfileJoinFunction() { - this.currentSeqNo = seqNo++; - } - - @Override - public void init(Config config, TaskContext context) { - counterPerJoinFn.put(this.currentSeqNo, new AtomicInteger(0)); - } - - @Override - public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, Profile> r) { - counterPerJoinFn.get(this.currentSeqNo).incrementAndGet(); - return r == null ? null : - new EnrichedPageView(m.getValue().getPageKey(), m.getKey(), r.getValue().getCompany()); - } - - @Override - public Integer getMessageKey(KV<Integer, PageView> message) { - return message.getKey(); - } - - @Override - public Integer getRecordKey(KV<Integer, Profile> record) { - return record.getKey(); - } - } - @Test public void testAsyncOperation() throws Exception { KeyValueStore kvStore = mock(KeyValueStore.class); http://git-wip-us.apache.org/repos/asf/samza/blob/2d6b1995/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index d9016b1..1e45f5e 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -46,8 +46,10 @@ import org.apache.samza.test.framework.stream.CollectionStream; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.junit.Test; -import static org.junit.Assert.*; +import static org.apache.samza.test.table.TestTableData.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness { private static final String PAGEVIEW_STREAM = "pageview"; @@ -72,20 +74,20 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness Arrays.asList(TestTableData.generateProfiles(5))); } - private void runTest(String systemName, StreamApplication app, List<TestTableData.PageView> pageViews, - List<TestTableData.Profile> profiles) { + private void runTest(String systemName, StreamApplication app, List<PageView> pageViews, + List<Profile> profiles) { Map<String, String> configs = new HashMap<>(); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName); configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName); - CollectionStream<TestTableData.PageView> pageViewStream = + CollectionStream<PageView> pageViewStream = CollectionStream.of(systemName, PAGEVIEW_STREAM, pageViews); - CollectionStream<TestTableData.Profile> profileStream = + CollectionStream<Profile> profileStream = CollectionStream.of(systemName, PROFILE_STREAM, profiles); - CollectionStream<TestTableData.EnrichedPageView> outputStream = + CollectionStream<EnrichedPageView> outputStream = CollectionStream.empty(systemName, ENRICHED_PAGEVIEW_STREAM); TestRunner @@ -97,15 +99,15 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness .run(Duration.ofMillis(100000)); try { - Map<Integer, List<TestTableData.EnrichedPageView>> result = TestRunner.consumeStream(outputStream, Duration.ofMillis(1000)); - List<TestTableData.EnrichedPageView> results = result.values().stream() + Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStream, Duration.ofMillis(1000)); + List<EnrichedPageView> results = result.values().stream() .flatMap(List::stream) .collect(Collectors.toList()); - List<TestTableData.EnrichedPageView> expectedEnrichedPageviews = pageViews.stream() + List<EnrichedPageView> expectedEnrichedPageviews = pageViews.stream() .flatMap(pv -> profiles.stream() .filter(profile -> pv.memberId == profile.memberId) - .map(profile -> new TestTableData.EnrichedPageView(pv.pageKey, profile.memberId, profile.company))) + .map(profile -> new EnrichedPageView(pv.pageKey, profile.memberId, profile.company))) .collect(Collectors.toList()); boolean successfulJoin = results.stream().allMatch(expectedEnrichedPageviews::contains); @@ -127,16 +129,16 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness graph.getInputStream(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()) .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view") - .join(table, new TestLocalTable.PageViewToProfileJoinFunction()) + .join(table, new PageViewToProfileJoinFunction()) .sendTo(graph.getOutputStream(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())); } - protected TableDescriptor<Integer, TestTableData.Profile, ?> getTableDescriptor() { - return new InMemoryTableDescriptor<Integer, TestTableData.Profile>(PROFILE_TABLE) - .withSerde(KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())) + protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() { + return new InMemoryTableDescriptor<Integer, Profile>(PROFILE_TABLE) + .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())) .withSideInputs(ImmutableList.of(PROFILE_STREAM)) .withSideInputsProcessor((msg, store) -> { - TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage(); + Profile profile = (Profile) msg.getMessage(); int key = profile.getMemberId(); return ImmutableList.of(new Entry<>(key, profile)); @@ -146,9 +148,9 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness static class DurablePageViewProfileJoin extends PageViewProfileJoin { @Override - protected TableDescriptor<Integer, TestTableData.Profile, ?> getTableDescriptor() { - return new RocksDbTableDescriptor<Integer, TestTableData.Profile>(PROFILE_TABLE) - .withSerde(KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde())) + protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() { + return new RocksDbTableDescriptor<Integer, Profile>(PROFILE_TABLE) + .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())) .withSideInputs(ImmutableList.of(PROFILE_STREAM)) .withSideInputsProcessor((msg, store) -> { TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage(); http://git-wip-us.apache.org/repos/asf/samza/blob/2d6b1995/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index 2d07b01..eb9fbe9 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -57,10 +57,11 @@ import org.apache.samza.task.TaskContext; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.Base64Serializer; import org.apache.samza.util.RateLimiter; +import com.google.common.cache.CacheBuilder; import org.junit.Assert; import org.junit.Test; -import com.google.common.cache.CacheBuilder; +import static org.apache.samza.test.table.TestTableData.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -70,11 +71,11 @@ import static org.mockito.Mockito.mock; public class TestRemoteTable extends AbstractIntegrationTestHarness { - static Map<String, List<TestTableData.EnrichedPageView>> writtenRecords = new HashMap<>(); + static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>(); - static class InMemoryReadFunction implements TableReadFunction<Integer, TestTableData.Profile> { + static class InMemoryReadFunction implements TableReadFunction<Integer, Profile> { private final String serializedProfiles; - private transient Map<Integer, TestTableData.Profile> profileMap; + private transient Map<Integer, Profile> profileMap; private InMemoryReadFunction(String profiles) { this.serializedProfiles = profiles; @@ -82,12 +83,12 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - TestTableData.Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, TestTableData.Profile[].class); + Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class); this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity())); } @Override - public CompletableFuture<TestTableData.Profile> getAsync(Integer key) { + public CompletableFuture<Profile> getAsync(Integer key) { return CompletableFuture.completedFuture(profileMap.get(key)); } @@ -96,8 +97,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { } } - static class InMemoryWriteFunction implements TableWriteFunction<Integer, TestTableData.EnrichedPageView> { - private transient List<TestTableData.EnrichedPageView> records; + static class InMemoryWriteFunction implements TableWriteFunction<Integer, EnrichedPageView> { + private transient List<EnrichedPageView> records; private String testName; public InMemoryWriteFunction(String testName) { @@ -113,7 +114,7 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { } @Override - public CompletableFuture<Void> putAsync(Integer key, TestTableData.EnrichedPageView record) { + public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) { records.add(record); return CompletableFuture.completedFuture(null); } @@ -147,8 +148,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { writtenRecords.put(testName, new ArrayList<>()); int count = 10; - TestTableData.PageView[] pageViews = TestTableData.generatePageViews(count); - String profiles = Base64Serializer.serialize(TestTableData.generateProfiles(count)); + PageView[] pageViews = generatePageViews(count); + String profiles = Base64Serializer.serialize(generateProfiles(count)); int partitionCount = 4; Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect()); @@ -161,32 +162,32 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { final RateLimiter writeRateLimiter = mock(RateLimiter.class); final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); final StreamApplication app = (streamGraph, cfg) -> { - RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1"); + RemoteTableDescriptor<Integer, Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1"); inputTableDesc .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles)) .withRateLimiter(readRateLimiter, null, null); - RemoteTableDescriptor<Integer, TestTableData.EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1"); + RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1"); outputTableDesc .withReadFunction(key -> null) // dummy reader .withWriteFunction(writer) .withRateLimiter(writeRateLimiter, null, null); - Table<KV<Integer, TestTableData.EnrichedPageView>> outputTable = streamGraph.getTable(outputTableDesc); + Table<KV<Integer, EnrichedPageView>> outputTable = streamGraph.getTable(outputTableDesc); if (withCache) { outputTable = getCachingTable(outputTable, defaultCache, "output", streamGraph); } - Table<KV<Integer, TestTableData.Profile>> inputTable = streamGraph.getTable(inputTableDesc); + Table<KV<Integer, Profile>> inputTable = streamGraph.getTable(inputTableDesc); if (withCache) { inputTable = getCachingTable(inputTable, defaultCache, "input", streamGraph); } - streamGraph.getInputStream("PageView", new NoOpSerde<TestTableData.PageView>()) + streamGraph.getInputStream("PageView", new NoOpSerde<PageView>()) .map(pv -> new KV<>(pv.getMemberId(), pv)) - .join(inputTable, new TestLocalTable.PageViewToProfileJoinFunction()) + .join(inputTable, new PageViewToProfileJoinFunction()) .map(m -> new KV(m.getMemberId(), m)) .sendTo(outputTable); }; @@ -196,7 +197,7 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness { int numExpected = count * partitionCount; Assert.assertEquals(numExpected, writtenRecords.get(testName).size()); - Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof TestTableData.EnrichedPageView); + Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView); } @Test
