Repository: samza
Updated Branches:
  refs/heads/master ad80cf9f1 -> a671288e1


http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
index 62304f3..7677826 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java
@@ -54,30 +54,30 @@ public class TestTimeSeriesStoreImpl {
 
     // read from time-range
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, 
"hello", 0L, 1L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
 
     // read from time-range [1,2) should return one entry
     values = readStore(timeSeriesStore, "hello", 1L, 2L);
-    Assert.assertEquals(values.size(), 1);
-    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("world-1", new String(values.get(0).getValue()));
 
     // read from time-range [2,3) should return two entries
     values = readStore(timeSeriesStore, "hello", 2L, 3L);
-    Assert.assertEquals(values.size(), 2);
-    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
-    Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+    Assert.assertEquals(2, values.size());
+    Assert.assertEquals("world-1", new String(values.get(0).getValue()));
+    Assert.assertEquals(2L, values.get(0).getTimestamp());
 
     // read from time-range [0,3) should return three entries
     values = readStore(timeSeriesStore, "hello", 0L, 3L);
-    Assert.assertEquals(values.size(), 3);
+    Assert.assertEquals(3, values.size());
 
     // read from time-range [2,999999) should return two entries
     values = readStore(timeSeriesStore, "hello", 2L, 999999L);
-    Assert.assertEquals(values.size(), 2);
+    Assert.assertEquals(2, values.size());
 
     // read from time-range [3,4) should return no entries
     values = readStore(timeSeriesStore, "hello", 3L, 4L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
   }
 
   @Test
@@ -87,11 +87,11 @@ public class TestTimeSeriesStoreImpl {
 
     // read from a non-existent key
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, 
"non-existent-key", 0, Integer.MAX_VALUE);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
 
     // read from an existing key but out of range timestamp
     values = readStore(timeSeriesStore, "hello", 2, Integer.MAX_VALUE);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
   }
 
   @Test
@@ -106,21 +106,21 @@ public class TestTimeSeriesStoreImpl {
 
     // read from time-range [0,2) should return 100 entries
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, 
"hello", 0L, 2L);
-    Assert.assertEquals(values.size(), 100);
+    Assert.assertEquals(100, values.size());
     values.forEach(timeSeriesValue -> {
-        Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-1");
+        Assert.assertEquals("world-1", new String(timeSeriesValue.getValue()));
       });
 
     // read from time-range [2,4) should return 100 entries
     values = readStore(timeSeriesStore, "hello", 2L, 4L);
-    Assert.assertEquals(values.size(), 100);
+    Assert.assertEquals(100, values.size());
     values.forEach(timeSeriesValue -> {
-        Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-2");
+        Assert.assertEquals("world-2", new String(timeSeriesValue.getValue()));
       });
 
     // read all entries in the store
     values = readStore(timeSeriesStore, "hello", 0L, Integer.MAX_VALUE);
-    Assert.assertEquals(values.size(), 200);
+    Assert.assertEquals(200, values.size());
   }
 
   @Test
@@ -135,30 +135,30 @@ public class TestTimeSeriesStoreImpl {
 
     // read from time-range
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, 
"hello", 0L, 1L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
 
     // read from time-range [1,2) should return one entry
     values = readStore(timeSeriesStore, "hello", 1L, 2L);
-    Assert.assertEquals(values.size(), 1);
-    Assert.assertEquals(new String(values.get(0).getValue()), "world-1");
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("world-1", new String(values.get(0).getValue()));
 
     // read from time-range [2,3) should return the most recent entry
     values = readStore(timeSeriesStore, "hello", 2L, 3L);
-    Assert.assertEquals(values.size(), 1);
-    Assert.assertEquals(new String(values.get(0).getValue()), "world-2");
-    Assert.assertEquals(values.get(0).getTimestamp(), new Long(2));
+    Assert.assertEquals(1, values.size());
+    Assert.assertEquals("world-2", new String(values.get(0).getValue()));
+    Assert.assertEquals(2L, values.get(0).getTimestamp());
 
     // read from time-range [0,3) should return two entries
     values = readStore(timeSeriesStore, "hello", 0L, 3L);
-    Assert.assertEquals(values.size(), 2);
+    Assert.assertEquals(2, values.size());
 
     // read from time-range [2,999999) should return one entry
     values = readStore(timeSeriesStore, "hello", 2L, 999999L);
-    Assert.assertEquals(values.size(), 1);
+    Assert.assertEquals(1, values.size());
 
     // read from time-range [3,4) should return no entries
     values = readStore(timeSeriesStore, "hello", 3L, 4L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
   }
 
   @Test
@@ -172,11 +172,11 @@ public class TestTimeSeriesStoreImpl {
     timeSeriesStore.put("hello", "world-2".getBytes(), 2L);
 
     List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, 
"hello", 1L, 3L);
-    Assert.assertEquals(values.size(), 2);
+    Assert.assertEquals(2, values.size());
 
     timeSeriesStore.remove("hello", 0L, 3L);
     values = readStore(timeSeriesStore, "hello", 1L, 3L);
-    Assert.assertEquals(values.size(), 0);
+    Assert.assertEquals(0, values.size());
   }
 
   private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, 
V> store, K key, long startTimestamp, long endTimestamp) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
new file mode 100644
index 0000000..40015ec
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl.store;
+
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.IntegerSerde;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+public class TestTimestampedValueSerde {
+
+  @Test
+  public void testEmptyValueDeserialization() {
+    byte[] bytesWithNoValue = new byte[8];
+    ByteBuffer.wrap(bytesWithNoValue).putLong(1234L);
+    TimestampedValueSerde<byte[]> timestampedValueSerde = new 
TimestampedValueSerde<>(new ByteSerde());
+    TimestampedValue<byte[]> timestampedValue = 
timestampedValueSerde.fromBytes(bytesWithNoValue);
+    assertEquals(1234L, timestampedValue.getTimestamp());
+    assertEquals(0, timestampedValue.getValue().length);
+  }
+
+  @Test
+  public void testEmptyValueSerialization() {
+    byte[] expectedBytes = new byte[8];
+    ByteBuffer.wrap(expectedBytes).putLong(1234L);
+
+    TimestampedValueSerde<Integer> timestampedValueSerde = new 
TimestampedValueSerde<>(new IntegerSerde());
+    TimestampedValue<Integer> timestampedValue = new TimestampedValue<>(null, 
1234L);
+    assertTrue(Arrays.equals(expectedBytes, 
timestampedValueSerde.toBytes(timestampedValue)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
deleted file mode 100644
index 2a8f039..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.test.operator;
-
-
-class PageView {
-  private String userId;
-  private String country;
-  private String url;
-
-  public String getUserId() {
-    return userId;
-  }
-
-  public String getCountry() {
-    return country;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-
-  public void setUserId(String userId) {
-    this.userId = userId;
-  }
-
-  public void setCountry(String country) {
-    this.country = country;
-  }
-
-  public void setUrl(String url) {
-    this.url = url;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
new file mode 100644
index 0000000..517d81f
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.operator;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.AdClick;
+import org.apache.samza.test.operator.data.PageView;
+import org.apache.samza.test.operator.data.UserPageAdClick;
+
+import java.time.Duration;
+
+/**
+ * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream 
join and a windowed count.
+ */
+public class RepartitionJoinWindowApp implements StreamApplication {
+  static final String PAGE_VIEWS = "page-views";
+  static final String AD_CLICKS = "ad-clicks";
+  static final String OUTPUT_TOPIC = "user-ad-click-counts";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, new 
JsonSerdeV2<>(PageView.class));
+    MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICKS, new 
JsonSerdeV2<>(AdClick.class));
+    OutputStream<KV<String, String>> outputStream =
+        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), 
new StringSerde()));
+
+    MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews
+        .partitionBy(PageView::getViewId, pv -> pv, new KVSerde<>(new 
StringSerde(), new JsonSerdeV2<>(PageView.class)))
+        .map(KV::getValue);
+
+    MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks
+        .partitionBy(AdClick::getViewId, ac -> ac, new KVSerde<>(new 
StringSerde(), new JsonSerdeV2<>(AdClick.class)))
+        .map(KV::getValue);
+
+    MessageStream<UserPageAdClick> userPageAdClicks = 
pageViewsRepartitionedByViewId
+        .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(),
+            new StringSerde(), new JsonSerdeV2<>(PageView.class), new 
JsonSerdeV2<>(AdClick.class),
+            Duration.ofMinutes(1));
+
+    userPageAdClicks
+        .partitionBy(UserPageAdClick::getUserId, upac -> upac,
+            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(UserPageAdClick.class)))
+        .map(KV::getValue)
+        .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, 
Duration.ofSeconds(3)))
+        .map(windowPane -> KV.of(windowPane.getKey().getKey(), 
String.valueOf(windowPane.getMessage().size())))
+        .sendTo(outputStream);
+  }
+
+  private static class UserPageViewAdClicksJoiner implements 
JoinFunction<String, PageView, AdClick, UserPageAdClick> {
+    @Override
+    public UserPageAdClick apply(PageView pv, AdClick ac) {
+      return new UserPageAdClick(pv.getUserId(), pv.getPageId(), ac.getAdId());
+    }
+
+    @Override
+    public String getFirstKey(PageView pv) {
+      return pv.getViewId();
+    }
+
+    @Override
+    public String getSecondKey(AdClick ac) {
+      return ac.getViewId();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
deleted file mode 100644
index 261b954..0000000
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.operator;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-
-import java.time.Duration;
-
-/**
- * A {@link StreamApplication} that demonstrates a partitionBy followed by a 
windowed count.
- */
-public class RepartitionWindowApp implements StreamApplication {
-  static final String INPUT_TOPIC = "page-views";
-  static final String OUTPUT_TOPIC = "page-view-counts";
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new 
JsonSerdeV2<>(PageView.class));
-
-    OutputStream<KV<String, String>> outputStream =
-        graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), 
new StringSerde()));
-
-    pageViews
-        .partitionBy(PageView::getUserId, pv -> pv, new KVSerde<>(new 
StringSerde(), new JsonSerdeV2<>(PageView.class)))
-        .window(Windows.keyedSessionWindow(KV::getKey, Duration.ofSeconds(3)))
-        .map(windowPane -> KV.of(windowPane.getKey().getKey(), 
String.valueOf(windowPane.getMessage().size())))
-        .sendTo(outputStream);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 4c83960..974cafc 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -30,6 +30,7 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.PageView;
 
 import java.time.Duration;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
index 9bb66ad..db46982 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
@@ -213,7 +213,6 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
    * @param overriddenConfigs configs to override
    */
   public void runApplication(StreamApplication streamApplication, String 
appName, Config overriddenConfigs) {
-
     Map<String, String> configs = new HashMap<>();
     configs.put("job.factory.class", 
"org.apache.samza.job.local.ThreadJobFactory");
     configs.put("job.name", appName);
@@ -231,6 +230,17 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
     configs.put("job.coordinator.replication.factor", "1");
     configs.put("task.window.ms", "1000");
 
+    // This is to prevent tests from taking a long time to stop after they're 
done. The issue is that
+    // tearDown currently doesn't call runner.kill(app), and shuts down the 
Kafka and ZK servers immediately.
+    // The test process then exits, triggering the SamzaContainer shutdown 
hook, which in turn tries to flush any
+    // store changelogs, which then get stuck trying to produce to the stopped 
Kafka server.
+    // Calling runner.kill doesn't work since RemoteApplicationRunner creates 
a new ThreadJob instance when
+    // kill is called. We can't use LocalApplicationRunner since 
ZkJobCoordinator doesn't currently create
+    // changelog streams. Hence we just force an unclean shutdown here to. 
This _should be_ OK
+    // since the test method has already executed by the time the shutdown 
hook is called. The side effect is
+    // that buffered state (e.g. changelog contents) might not be flushed 
correctly after the test run.
+    configs.put("task.shutdown.ms", "1");
+
     if (overriddenConfigs != null) {
       configs.putAll(overriddenConfigs);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
new file mode 100644
index 0000000..117f97b
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.samza.test.operator.RepartitionJoinWindowApp.AD_CLICKS;
+import static 
org.apache.samza.test.operator.RepartitionJoinWindowApp.PAGE_VIEWS;
+import static 
org.apache.samza.test.operator.RepartitionJoinWindowApp.OUTPUT_TOPIC;
+
+/**
+ * Test driver for {@link RepartitionJoinWindowApp}.
+ */
+public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTestHarness {
+  private static final String APP_NAME = "UserPageAdClickCounter";
+
+  @Test
+  public void testRepartitionJoinWindowApp() throws Exception {
+    // create topics
+    createTopic(PAGE_VIEWS, 2);
+    createTopic(AD_CLICKS, 2);
+    createTopic(OUTPUT_TOPIC, 1);
+
+    // create events for the following user activity.
+    // userId: (viewId, pageId, (adIds))
+    // u1: (v1, p1, (a1, a2)), (v2, p2, (a3))
+    // u2: (v3, p1, (a1, a2, a4)), (v4, p3, (a5))
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 1, "p2", 
"{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
+    produceMessage(PAGE_VIEWS, 1, "p3", 
"{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+
+    produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v1\",\"adId\":\"a1\"}");
+    produceMessage(AD_CLICKS, 1, "a2", "{\"viewId\":\"v1\",\"adId\":\"a2\"}");
+    produceMessage(AD_CLICKS, 0, "a3", "{\"viewId\":\"v2\",\"adId\":\"a3\"}");
+    produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v3\",\"adId\":\"a1\"}");
+    produceMessage(AD_CLICKS, 1, "a2", "{\"viewId\":\"v3\",\"adId\":\"a2\"}");
+    produceMessage(AD_CLICKS, 1, "a4", "{\"viewId\":\"v3\",\"adId\":\"a4\"}");
+    produceMessage(AD_CLICKS, 0, "a5", "{\"viewId\":\"v4\",\"adId\":\"a5\"}");
+
+    // run the application
+    RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
+    runApplication(app, APP_NAME, null);
+
+    // consume and validate result
+    List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
+    Assert.assertEquals(2, messages.size());
+
+    for (ConsumerRecord<String, String> message : messages) {
+      String key = message.key();
+      String value = message.value();
+      Assert.assertTrue(key.equals("u1") || key.equals("u2"));
+      if ("u1".equals(key)) {
+        Assert.assertEquals("3", value);
+      } else {
+        Assert.assertEquals("4", value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
deleted file mode 100644
index 3745541..0000000
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.test.operator;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.samza.test.operator.RepartitionWindowApp.INPUT_TOPIC;
-import static org.apache.samza.test.operator.RepartitionWindowApp.OUTPUT_TOPIC;
-
-/**
- * Test driver for {@link RepartitionWindowApp}.
- */
-public class TestRepartitionWindowApp extends 
StreamApplicationIntegrationTestHarness {
-  private static final String APP_NAME = "RepartitionedSessionizer";
-
-  @Test
-  public void testRepartitionedSessionWindowCounter() throws Exception {
-    // create topics
-    createTopic(INPUT_TOPIC, 3);
-    createTopic(OUTPUT_TOPIC, 1);
-
-    // produce messages to different partitions.
-    produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", 
\"country\":\"india\",\"url\":\"5.com\"}");
-    produceMessage(INPUT_TOPIC, 1, "userId2", "{\"userId\":\"userId2\", 
\"country\":\"china\",\"url\":\"4.com\"}");
-    produceMessage(INPUT_TOPIC, 2, "userId1", "{\"userId\":\"userId1\", 
\"country\":\"india\",\"url\":\"1.com\"}");
-    produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", 
\"country\":\"india\",\"url\":\"2.com\"}");
-    produceMessage(INPUT_TOPIC, 1, "userId1", "{\"userId\":\"userId1\", 
\"country\":\"india\",\"url\":\"3.com\"}");
-
-    // run the application
-    RepartitionWindowApp app = new RepartitionWindowApp();
-    runApplication(app, APP_NAME, null);
-
-    // consume and validate result
-    List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
-    Assert.assertEquals(messages.size(), 2);
-
-    for (ConsumerRecord<String, String> message : messages) {
-      String key = message.key();
-      String value = message.value();
-      // Assert that there are 4 messages for userId1 and 1 message for 
userId2.
-      Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
-      if ("userId1".equals(key)) {
-        Assert.assertEquals("4", value);
-      } else {
-        Assert.assertEquals("1", value);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 3f3e615..151c9d1 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -30,6 +30,7 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.PageView;
 
 import java.time.Duration;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
new file mode 100644
index 0000000..ee699ae
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+
+public class AdClick {
+  private String viewId;
+  private String adId;
+
+  public String getViewId() {
+    return viewId;
+  }
+
+  public void setViewId(String viewId) {
+    this.viewId = viewId;
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public void setAdId(String adId) {
+    this.adId = adId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
new file mode 100644
index 0000000..d2cebf9
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+
+public class PageView {
+  private String viewId;
+  private String pageId;
+  private String userId;
+
+  public String getViewId() {
+    return viewId;
+  }
+
+  public void setViewId(String viewId) {
+    this.viewId = viewId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+  public void setPageId(String pageId) {
+    this.pageId = pageId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public void setUserId(String userId) {
+    this.userId = userId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
new file mode 100644
index 0000000..e5f7b53
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator.data;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+public class UserPageAdClick {
+  private String userId;
+  private String pageId;
+  private String adId;
+
+  @JsonCreator
+  public UserPageAdClick(
+      @JsonProperty("userId") String userId,
+      @JsonProperty("pageId") String pageId,
+      @JsonProperty("adId") String adId) {
+    this.userId = userId;
+    this.pageId = pageId;
+    this.adId = adId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public void setUserId(String userId) {
+    this.userId = userId;
+  }
+
+  public String getPageId() {
+    return pageId;
+  }
+
+  public void setPageId(String pageId) {
+    this.pageId = pageId;
+  }
+
+  public String getAdId() {
+    return adId;
+  }
+
+  public void setAdId(String adId) {
+    this.adId = adId;
+  }
+}

Reply via email to