Repository: samza
Updated Branches:
  refs/heads/master 4394b89ef -> 2d6b19953


SAMZA-1826: Fix unit test failure in table tests

The way that we verify the join counts in PageViewToProfileJoinFunction across 
multiple table tests (i.e. TestLocalTable, TestRemoteTable, 
TestLocalTableWithSideInputs) creates some conflicts in the static counter map 
and triggers test failure in certain sequence of ordering of tests. Fixing it 
by requiring explicit name of the join functions in tests and register / verify 
by unique op names.

Author: Yi Pan (Data Infrastructure) <[email protected]>

Closes #617 from nickpan47/SAMZA-1826 and squashes the following commits:

a25c484d [Yi Pan (Data Infrastructure)] SAMZA-1826: removing assertion on 
internal state of MapFunction in integration tests
566f13c5 [Yi Pan (Data Infrastructure)] SAMZA-1826: Fix unit test failure in 
table tests


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2d6b1995
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2d6b1995
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2d6b1995

Branch: refs/heads/master
Commit: 2d6b19953a3660f7e2aed3d59b98b1c161333a40
Parents: 4394b89
Author: Yi Pan (Data Infrastructure) <[email protected]>
Authored: Sun Aug 26 18:51:12 2018 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Sun Aug 26 18:51:12 2018 -0700

----------------------------------------------------------------------
 .../table/PageViewToProfileJoinFunction.java    | 48 ++++++++++++++++++++
 .../apache/samza/test/table/TestLocalTable.java | 47 +------------------
 .../table/TestLocalTableWithSideInputs.java     | 38 ++++++++--------
 .../samza/test/table/TestRemoteTable.java       | 37 +++++++--------
 4 files changed, 89 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2d6b1995/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
new file mode 100644
index 0000000..d253284
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/PageViewToProfileJoinFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.table;
+
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import org.apache.samza.test.table.TestTableData.PageView;
+import org.apache.samza.test.table.TestTableData.Profile;
+
+/**
+ * A {@link StreamTableJoinFunction} used by unit tests in this package
+ */
+class PageViewToProfileJoinFunction implements StreamTableJoinFunction
+    <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
+
+  @Override
+  public TestTableData.EnrichedPageView apply(KV<Integer, 
TestTableData.PageView> m, KV<Integer, TestTableData.Profile> r) {
+    return r == null ? null : new 
TestTableData.EnrichedPageView(m.getValue().getPageKey(), m.getKey(), 
r.getValue().getCompany());
+  }
+
+  @Override
+  public Integer getMessageKey(KV<Integer, TestTableData.PageView> message) {
+    return message.getKey();
+  }
+
+  @Override
+  public Integer getRecordKey(KV<Integer, TestTableData.Profile> record) {
+    return record.getKey();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2d6b1995/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index 14ef751..e5775f0 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -25,8 +25,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -42,7 +40,6 @@ import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
@@ -57,17 +54,13 @@ import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import org.apache.samza.test.table.TestTableData.PageView;
-import org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
-import org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
-import org.apache.samza.test.table.TestTableData.Profile;
-import org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
 import org.apache.samza.test.util.ArraySystemFactory;
 import org.apache.samza.test.util.Base64Serializer;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.samza.test.table.TestTableData.*;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -250,9 +243,6 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
       assertEquals(count * partitionCount, sentToProfileTable1.size());
       assertEquals(count * partitionCount, sentToProfileTable2.size());
 
-      for (int i = 0; i < PageViewToProfileJoinFunction.seqNo; i++) {
-        assertEquals(count * partitionCount, 
PageViewToProfileJoinFunction.counterPerJoinFn.get(i).intValue());
-      }
       assertEquals(count * partitionCount, joinedPageViews1.size());
       assertEquals(count * partitionCount, joinedPageViews2.size());
       assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView);
@@ -345,39 +335,6 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
     }
   }
 
-  static class PageViewToProfileJoinFunction implements StreamTableJoinFunction
-      <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> 
{
-    private static Map<Integer, AtomicInteger> counterPerJoinFn = new 
HashMap<>();
-    private static int seqNo = 0;
-    private final int currentSeqNo;
-
-    public PageViewToProfileJoinFunction() {
-      this.currentSeqNo = seqNo++;
-    }
-
-    @Override
-    public void init(Config config, TaskContext context) {
-      counterPerJoinFn.put(this.currentSeqNo, new AtomicInteger(0));
-    }
-
-    @Override
-    public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, 
Profile> r) {
-      counterPerJoinFn.get(this.currentSeqNo).incrementAndGet();
-      return r == null ? null :
-          new EnrichedPageView(m.getValue().getPageKey(), m.getKey(), 
r.getValue().getCompany());
-    }
-
-    @Override
-    public Integer getMessageKey(KV<Integer, PageView> message) {
-      return message.getKey();
-    }
-
-    @Override
-    public Integer getRecordKey(KV<Integer, Profile> record) {
-      return record.getKey();
-    }
-  }
-
   @Test
   public void testAsyncOperation() throws Exception {
     KeyValueStore kvStore = mock(KeyValueStore.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/2d6b1995/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index d9016b1..1e45f5e 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -46,8 +46,10 @@ import 
org.apache.samza.test.framework.stream.CollectionStream;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.apache.samza.test.table.TestTableData.*;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness {
   private static final String PAGEVIEW_STREAM = "pageview";
@@ -72,20 +74,20 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
         Arrays.asList(TestTableData.generateProfiles(5)));
   }
 
-  private void runTest(String systemName, StreamApplication app, 
List<TestTableData.PageView> pageViews,
-      List<TestTableData.Profile> profiles) {
+  private void runTest(String systemName, StreamApplication app, 
List<PageView> pageViews,
+      List<Profile> profiles) {
     Map<String, String> configs = new HashMap<>();
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
PAGEVIEW_STREAM), systemName);
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
PROFILE_STREAM), systemName);
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
ENRICHED_PAGEVIEW_STREAM), systemName);
     configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName);
 
-    CollectionStream<TestTableData.PageView> pageViewStream =
+    CollectionStream<PageView> pageViewStream =
         CollectionStream.of(systemName, PAGEVIEW_STREAM, pageViews);
-    CollectionStream<TestTableData.Profile> profileStream =
+    CollectionStream<Profile> profileStream =
         CollectionStream.of(systemName, PROFILE_STREAM, profiles);
 
-    CollectionStream<TestTableData.EnrichedPageView> outputStream =
+    CollectionStream<EnrichedPageView> outputStream =
         CollectionStream.empty(systemName, ENRICHED_PAGEVIEW_STREAM);
 
     TestRunner
@@ -97,15 +99,15 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
         .run(Duration.ofMillis(100000));
 
     try {
-      Map<Integer, List<TestTableData.EnrichedPageView>> result = 
TestRunner.consumeStream(outputStream, Duration.ofMillis(1000));
-      List<TestTableData.EnrichedPageView> results = result.values().stream()
+      Map<Integer, List<EnrichedPageView>> result = 
TestRunner.consumeStream(outputStream, Duration.ofMillis(1000));
+      List<EnrichedPageView> results = result.values().stream()
           .flatMap(List::stream)
           .collect(Collectors.toList());
 
-      List<TestTableData.EnrichedPageView> expectedEnrichedPageviews = 
pageViews.stream()
+      List<EnrichedPageView> expectedEnrichedPageviews = pageViews.stream()
           .flatMap(pv -> profiles.stream()
               .filter(profile -> pv.memberId == profile.memberId)
-              .map(profile -> new TestTableData.EnrichedPageView(pv.pageKey, 
profile.memberId, profile.company)))
+              .map(profile -> new EnrichedPageView(pv.pageKey, 
profile.memberId, profile.company)))
           .collect(Collectors.toList());
 
       boolean successfulJoin = 
results.stream().allMatch(expectedEnrichedPageviews::contains);
@@ -127,16 +129,16 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
 
       graph.getInputStream(PAGEVIEW_STREAM, new 
NoOpSerde<TestTableData.PageView>())
           .partitionBy(TestTableData.PageView::getMemberId, v -> v, 
"partition-page-view")
-          .join(table, new TestLocalTable.PageViewToProfileJoinFunction())
+          .join(table, new PageViewToProfileJoinFunction())
           .sendTo(graph.getOutputStream(ENRICHED_PAGEVIEW_STREAM, new 
NoOpSerde<>()));
     }
 
-    protected TableDescriptor<Integer, TestTableData.Profile, ?> 
getTableDescriptor() {
-      return new InMemoryTableDescriptor<Integer, 
TestTableData.Profile>(PROFILE_TABLE)
-          .withSerde(KVSerde.of(new IntegerSerde(), new 
TestTableData.ProfileJsonSerde()))
+    protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
+      return new InMemoryTableDescriptor<Integer, Profile>(PROFILE_TABLE)
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
-              TestTableData.Profile profile = (TestTableData.Profile) 
msg.getMessage();
+              Profile profile = (Profile) msg.getMessage();
               int key = profile.getMemberId();
 
               return ImmutableList.of(new Entry<>(key, profile));
@@ -146,9 +148,9 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
 
   static class DurablePageViewProfileJoin extends PageViewProfileJoin {
     @Override
-    protected TableDescriptor<Integer, TestTableData.Profile, ?> 
getTableDescriptor() {
-      return new RocksDbTableDescriptor<Integer, 
TestTableData.Profile>(PROFILE_TABLE)
-          .withSerde(KVSerde.of(new IntegerSerde(), new 
TestTableData.ProfileJsonSerde()))
+    protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
+      return new RocksDbTableDescriptor<Integer, Profile>(PROFILE_TABLE)
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
               TestTableData.Profile profile = (TestTableData.Profile) 
msg.getMessage();

http://git-wip-us.apache.org/repos/asf/samza/blob/2d6b1995/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index 2d07b01..eb9fbe9 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -57,10 +57,11 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.Base64Serializer;
 import org.apache.samza.util.RateLimiter;
+import com.google.common.cache.CacheBuilder;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.cache.CacheBuilder;
+import static org.apache.samza.test.table.TestTableData.*;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -70,11 +71,11 @@ import static org.mockito.Mockito.mock;
 
 public class TestRemoteTable extends AbstractIntegrationTestHarness {
 
-  static Map<String, List<TestTableData.EnrichedPageView>> writtenRecords = 
new HashMap<>();
+  static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
 
-  static class InMemoryReadFunction implements TableReadFunction<Integer, 
TestTableData.Profile> {
+  static class InMemoryReadFunction implements TableReadFunction<Integer, 
Profile> {
     private final String serializedProfiles;
-    private transient Map<Integer, TestTableData.Profile> profileMap;
+    private transient Map<Integer, Profile> profileMap;
 
     private InMemoryReadFunction(String profiles) {
       this.serializedProfiles = profiles;
@@ -82,12 +83,12 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
 
     private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
       in.defaultReadObject();
-      TestTableData.Profile[] profiles = 
Base64Serializer.deserialize(this.serializedProfiles, 
TestTableData.Profile[].class);
+      Profile[] profiles = 
Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
       this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> 
p.getMemberId(), Function.identity()));
     }
 
     @Override
-    public CompletableFuture<TestTableData.Profile> getAsync(Integer key) {
+    public CompletableFuture<Profile> getAsync(Integer key) {
       return CompletableFuture.completedFuture(profileMap.get(key));
     }
 
@@ -96,8 +97,8 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     }
   }
 
-  static class InMemoryWriteFunction implements TableWriteFunction<Integer, 
TestTableData.EnrichedPageView> {
-    private transient List<TestTableData.EnrichedPageView> records;
+  static class InMemoryWriteFunction implements TableWriteFunction<Integer, 
EnrichedPageView> {
+    private transient List<EnrichedPageView> records;
     private String testName;
 
     public InMemoryWriteFunction(String testName) {
@@ -113,7 +114,7 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     }
 
     @Override
-    public CompletableFuture<Void> putAsync(Integer key, 
TestTableData.EnrichedPageView record) {
+    public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView 
record) {
       records.add(record);
       return CompletableFuture.completedFuture(null);
     }
@@ -147,8 +148,8 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     writtenRecords.put(testName, new ArrayList<>());
 
     int count = 10;
-    TestTableData.PageView[] pageViews = 
TestTableData.generatePageViews(count);
-    String profiles = 
Base64Serializer.serialize(TestTableData.generateProfiles(count));
+    PageView[] pageViews = generatePageViews(count);
+    String profiles = Base64Serializer.serialize(generateProfiles(count));
 
     int partitionCount = 4;
     Map<String, String> configs = 
TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
@@ -161,32 +162,32 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     final RateLimiter writeRateLimiter = mock(RateLimiter.class);
     final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
     final StreamApplication app = (streamGraph, cfg) -> {
-      RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = 
new RemoteTableDescriptor<>("profile-table-1");
+      RemoteTableDescriptor<Integer, Profile> inputTableDesc = new 
RemoteTableDescriptor<>("profile-table-1");
       inputTableDesc
           
.withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
           .withRateLimiter(readRateLimiter, null, null);
 
-      RemoteTableDescriptor<Integer, TestTableData.EnrichedPageView> 
outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
+      RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new 
RemoteTableDescriptor<>("enriched-page-view-table-1");
       outputTableDesc
           .withReadFunction(key -> null) // dummy reader
           .withWriteFunction(writer)
           .withRateLimiter(writeRateLimiter, null, null);
 
-      Table<KV<Integer, TestTableData.EnrichedPageView>> outputTable = 
streamGraph.getTable(outputTableDesc);
+      Table<KV<Integer, EnrichedPageView>> outputTable = 
streamGraph.getTable(outputTableDesc);
 
       if (withCache) {
         outputTable = getCachingTable(outputTable, defaultCache, "output", 
streamGraph);
       }
 
-      Table<KV<Integer, TestTableData.Profile>> inputTable = 
streamGraph.getTable(inputTableDesc);
+      Table<KV<Integer, Profile>> inputTable = 
streamGraph.getTable(inputTableDesc);
 
       if (withCache) {
         inputTable = getCachingTable(inputTable, defaultCache, "input", 
streamGraph);
       }
 
-      streamGraph.getInputStream("PageView", new 
NoOpSerde<TestTableData.PageView>())
+      streamGraph.getInputStream("PageView", new NoOpSerde<PageView>())
           .map(pv -> new KV<>(pv.getMemberId(), pv))
-          .join(inputTable, new TestLocalTable.PageViewToProfileJoinFunction())
+          .join(inputTable, new PageViewToProfileJoinFunction())
           .map(m -> new KV(m.getMemberId(), m))
           .sendTo(outputTable);
     };
@@ -196,7 +197,7 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
 
     int numExpected = count * partitionCount;
     Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
-    Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof 
TestTableData.EnrichedPageView);
+    Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof 
EnrichedPageView);
   }
 
   @Test

Reply via email to