Repository: samza Updated Branches: refs/heads/master ad80cf9f1 -> a671288e1
http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java index 62304f3..7677826 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java @@ -54,30 +54,30 @@ public class TestTimeSeriesStoreImpl { // read from time-range List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L); - Assert.assertEquals(values.size(), 0); + Assert.assertEquals(0, values.size()); // read from time-range [1,2) should return one entry values = readStore(timeSeriesStore, "hello", 1L, 2L); - Assert.assertEquals(values.size(), 1); - Assert.assertEquals(new String(values.get(0).getValue()), "world-1"); + Assert.assertEquals(1, values.size()); + Assert.assertEquals("world-1", new String(values.get(0).getValue())); // read from time-range [2,3) should return two entries values = readStore(timeSeriesStore, "hello", 2L, 3L); - Assert.assertEquals(values.size(), 2); - Assert.assertEquals(new String(values.get(0).getValue()), "world-1"); - Assert.assertEquals(values.get(0).getTimestamp(), new Long(2)); + Assert.assertEquals(2, values.size()); + Assert.assertEquals("world-1", new String(values.get(0).getValue())); + Assert.assertEquals(2L, values.get(0).getTimestamp()); // read from time-range [0,3) should return three entries values = readStore(timeSeriesStore, "hello", 0L, 3L); - Assert.assertEquals(values.size(), 3); + Assert.assertEquals(3, values.size()); // read from time-range [2,999999) should return two entries values = readStore(timeSeriesStore, "hello", 2L, 999999L); - Assert.assertEquals(values.size(), 2); + Assert.assertEquals(2, values.size()); // read from time-range [3,4) should return no entries values = readStore(timeSeriesStore, "hello", 3L, 4L); - Assert.assertEquals(values.size(), 0); + Assert.assertEquals(0, values.size()); } @Test @@ -87,11 +87,11 @@ public class TestTimeSeriesStoreImpl { // read from a non-existent key List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "non-existent-key", 0, Integer.MAX_VALUE); - Assert.assertEquals(values.size(), 0); + Assert.assertEquals(0, values.size()); // read from an existing key but out of range timestamp values = readStore(timeSeriesStore, "hello", 2, Integer.MAX_VALUE); - Assert.assertEquals(values.size(), 0); + Assert.assertEquals(0, values.size()); } @Test @@ -106,21 +106,21 @@ public class TestTimeSeriesStoreImpl { // read from time-range [0,2) should return 100 entries List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 2L); - Assert.assertEquals(values.size(), 100); + Assert.assertEquals(100, values.size()); values.forEach(timeSeriesValue -> { - Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-1"); + Assert.assertEquals("world-1", new String(timeSeriesValue.getValue())); }); // read from time-range [2,4) should return 100 entries values = readStore(timeSeriesStore, "hello", 2L, 4L); - Assert.assertEquals(values.size(), 100); + Assert.assertEquals(100, values.size()); values.forEach(timeSeriesValue -> { - Assert.assertEquals(new String(timeSeriesValue.getValue()), "world-2"); + Assert.assertEquals("world-2", new String(timeSeriesValue.getValue())); }); // read all entries in the store values = readStore(timeSeriesStore, "hello", 0L, Integer.MAX_VALUE); - Assert.assertEquals(values.size(), 200); + Assert.assertEquals(200, values.size()); } @Test @@ -135,30 +135,30 @@ public class TestTimeSeriesStoreImpl { // read from time-range List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 0L, 1L); - Assert.assertEquals(values.size(), 0); + Assert.assertEquals(0, values.size()); // read from time-range [1,2) should return one entry values = readStore(timeSeriesStore, "hello", 1L, 2L); - Assert.assertEquals(values.size(), 1); - Assert.assertEquals(new String(values.get(0).getValue()), "world-1"); + Assert.assertEquals(1, values.size()); + Assert.assertEquals("world-1", new String(values.get(0).getValue())); // read from time-range [2,3) should return the most recent entry values = readStore(timeSeriesStore, "hello", 2L, 3L); - Assert.assertEquals(values.size(), 1); - Assert.assertEquals(new String(values.get(0).getValue()), "world-2"); - Assert.assertEquals(values.get(0).getTimestamp(), new Long(2)); + Assert.assertEquals(1, values.size()); + Assert.assertEquals("world-2", new String(values.get(0).getValue())); + Assert.assertEquals(2L, values.get(0).getTimestamp()); // read from time-range [0,3) should return two entries values = readStore(timeSeriesStore, "hello", 0L, 3L); - Assert.assertEquals(values.size(), 2); + Assert.assertEquals(2, values.size()); // read from time-range [2,999999) should return one entry values = readStore(timeSeriesStore, "hello", 2L, 999999L); - Assert.assertEquals(values.size(), 1); + Assert.assertEquals(1, values.size()); // read from time-range [3,4) should return no entries values = readStore(timeSeriesStore, "hello", 3L, 4L); - Assert.assertEquals(values.size(), 0); + Assert.assertEquals(0, values.size()); } @Test @@ -172,11 +172,11 @@ public class TestTimeSeriesStoreImpl { timeSeriesStore.put("hello", "world-2".getBytes(), 2L); List<TimestampedValue<byte[]>> values = readStore(timeSeriesStore, "hello", 1L, 3L); - Assert.assertEquals(values.size(), 2); + Assert.assertEquals(2, values.size()); timeSeriesStore.remove("hello", 0L, 3L); values = readStore(timeSeriesStore, "hello", 1L, 3L); - Assert.assertEquals(values.size(), 0); + Assert.assertEquals(0, values.size()); } private static <K, V> List<TimestampedValue<V>> readStore(TimeSeriesStore<K, V> store, K key, long startTimestamp, long endTimestamp) { http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java new file mode 100644 index 0000000..40015ec --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl.store; + +import org.apache.samza.serializers.ByteSerde; +import org.apache.samza.serializers.IntegerSerde; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +public class TestTimestampedValueSerde { + + @Test + public void testEmptyValueDeserialization() { + byte[] bytesWithNoValue = new byte[8]; + ByteBuffer.wrap(bytesWithNoValue).putLong(1234L); + TimestampedValueSerde<byte[]> timestampedValueSerde = new TimestampedValueSerde<>(new ByteSerde()); + TimestampedValue<byte[]> timestampedValue = timestampedValueSerde.fromBytes(bytesWithNoValue); + assertEquals(1234L, timestampedValue.getTimestamp()); + assertEquals(0, timestampedValue.getValue().length); + } + + @Test + public void testEmptyValueSerialization() { + byte[] expectedBytes = new byte[8]; + ByteBuffer.wrap(expectedBytes).putLong(1234L); + + TimestampedValueSerde<Integer> timestampedValueSerde = new TimestampedValueSerde<>(new IntegerSerde()); + TimestampedValue<Integer> timestampedValue = new TimestampedValue<>(null, 1234L); + assertTrue(Arrays.equals(expectedBytes, timestampedValueSerde.toBytes(timestampedValue))); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java deleted file mode 100644 index 2a8f039..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.test.operator; - - -class PageView { - private String userId; - private String country; - private String url; - - public String getUserId() { - return userId; - } - - public String getCountry() { - return country; - } - - public String getUrl() { - return url; - } - - public void setUserId(String userId) { - this.userId = userId; - } - - public void setCountry(String country) { - this.country = country; - } - - public void setUrl(String url) { - this.url = url; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java new file mode 100644 index 0000000..517d81f --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.operator; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.test.operator.data.AdClick; +import org.apache.samza.test.operator.data.PageView; +import org.apache.samza.test.operator.data.UserPageAdClick; + +import java.time.Duration; + +/** + * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count. + */ +public class RepartitionJoinWindowApp implements StreamApplication { + static final String PAGE_VIEWS = "page-views"; + static final String AD_CLICKS = "ad-clicks"; + static final String OUTPUT_TOPIC = "user-ad-click-counts"; + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, new JsonSerdeV2<>(PageView.class)); + MessageStream<AdClick> adClicks = graph.getInputStream(AD_CLICKS, new JsonSerdeV2<>(AdClick.class)); + OutputStream<KV<String, String>> outputStream = + graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde())); + + MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews + .partitionBy(PageView::getViewId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class))) + .map(KV::getValue); + + MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks + .partitionBy(AdClick::getViewId, ac -> ac, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class))) + .map(KV::getValue); + + MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewId + .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(), + new StringSerde(), new JsonSerdeV2<>(PageView.class), new JsonSerdeV2<>(AdClick.class), + Duration.ofMinutes(1)); + + userPageAdClicks + .partitionBy(UserPageAdClick::getUserId, upac -> upac, + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class))) + .map(KV::getValue) + .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3))) + .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size()))) + .sendTo(outputStream); + } + + private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> { + @Override + public UserPageAdClick apply(PageView pv, AdClick ac) { + return new UserPageAdClick(pv.getUserId(), pv.getPageId(), ac.getAdId()); + } + + @Override + public String getFirstKey(PageView pv) { + return pv.getViewId(); + } + + @Override + public String getSecondKey(AdClick ac) { + return ac.getViewId(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java deleted file mode 100644 index 261b954..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.operator; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; - -import java.time.Duration; - -/** - * A {@link StreamApplication} that demonstrates a partitionBy followed by a windowed count. - */ -public class RepartitionWindowApp implements StreamApplication { - static final String INPUT_TOPIC = "page-views"; - static final String OUTPUT_TOPIC = "page-view-counts"; - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class)); - - OutputStream<KV<String, String>> outputStream = - graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde())); - - pageViews - .partitionBy(PageView::getUserId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class))) - .window(Windows.keyedSessionWindow(KV::getKey, Duration.ofSeconds(3))) - .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size()))) - .sendTo(outputStream); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java index 4c83960..974cafc 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java @@ -30,6 +30,7 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.test.operator.data.PageView; import java.time.Duration; http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java index 9bb66ad..db46982 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java @@ -213,7 +213,6 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration * @param overriddenConfigs configs to override */ public void runApplication(StreamApplication streamApplication, String appName, Config overriddenConfigs) { - Map<String, String> configs = new HashMap<>(); configs.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory"); configs.put("job.name", appName); @@ -231,6 +230,17 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration configs.put("job.coordinator.replication.factor", "1"); configs.put("task.window.ms", "1000"); + // This is to prevent tests from taking a long time to stop after they're done. The issue is that + // tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately. + // The test process then exits, triggering the SamzaContainer shutdown hook, which in turn tries to flush any + // store changelogs, which then get stuck trying to produce to the stopped Kafka server. + // Calling runner.kill doesn't work since RemoteApplicationRunner creates a new ThreadJob instance when + // kill is called. We can't use LocalApplicationRunner since ZkJobCoordinator doesn't currently create + // changelog streams. Hence we just force an unclean shutdown here to. This _should be_ OK + // since the test method has already executed by the time the shutdown hook is called. The side effect is + // that buffered state (e.g. changelog contents) might not be flushed correctly after the test run. + configs.put("task.shutdown.ms", "1"); + if (overriddenConfigs != null) { configs.putAll(overriddenConfigs); } http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java new file mode 100644 index 0000000..117f97b --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.operator; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.apache.samza.test.operator.RepartitionJoinWindowApp.AD_CLICKS; +import static org.apache.samza.test.operator.RepartitionJoinWindowApp.PAGE_VIEWS; +import static org.apache.samza.test.operator.RepartitionJoinWindowApp.OUTPUT_TOPIC; + +/** + * Test driver for {@link RepartitionJoinWindowApp}. + */ +public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTestHarness { + private static final String APP_NAME = "UserPageAdClickCounter"; + + @Test + public void testRepartitionJoinWindowApp() throws Exception { + // create topics + createTopic(PAGE_VIEWS, 2); + createTopic(AD_CLICKS, 2); + createTopic(OUTPUT_TOPIC, 1); + + // create events for the following user activity. + // userId: (viewId, pageId, (adIds)) + // u1: (v1, p1, (a1, a2)), (v2, p2, (a3)) + // u2: (v3, p1, (a1, a2, a4)), (v4, p3, (a5)) + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}"); + produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}"); + + produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v1\",\"adId\":\"a1\"}"); + produceMessage(AD_CLICKS, 1, "a2", "{\"viewId\":\"v1\",\"adId\":\"a2\"}"); + produceMessage(AD_CLICKS, 0, "a3", "{\"viewId\":\"v2\",\"adId\":\"a3\"}"); + produceMessage(AD_CLICKS, 0, "a1", "{\"viewId\":\"v3\",\"adId\":\"a1\"}"); + produceMessage(AD_CLICKS, 1, "a2", "{\"viewId\":\"v3\",\"adId\":\"a2\"}"); + produceMessage(AD_CLICKS, 1, "a4", "{\"viewId\":\"v3\",\"adId\":\"a4\"}"); + produceMessage(AD_CLICKS, 0, "a5", "{\"viewId\":\"v4\",\"adId\":\"a5\"}"); + + // run the application + RepartitionJoinWindowApp app = new RepartitionJoinWindowApp(); + runApplication(app, APP_NAME, null); + + // consume and validate result + List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2); + Assert.assertEquals(2, messages.size()); + + for (ConsumerRecord<String, String> message : messages) { + String key = message.key(); + String value = message.value(); + Assert.assertTrue(key.equals("u1") || key.equals("u2")); + if ("u1".equals(key)) { + Assert.assertEquals("3", value); + } else { + Assert.assertEquals("4", value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java deleted file mode 100644 index 3745541..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.test.operator; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Collections; -import java.util.List; - -import static org.apache.samza.test.operator.RepartitionWindowApp.INPUT_TOPIC; -import static org.apache.samza.test.operator.RepartitionWindowApp.OUTPUT_TOPIC; - -/** - * Test driver for {@link RepartitionWindowApp}. - */ -public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness { - private static final String APP_NAME = "RepartitionedSessionizer"; - - @Test - public void testRepartitionedSessionWindowCounter() throws Exception { - // create topics - createTopic(INPUT_TOPIC, 3); - createTopic(OUTPUT_TOPIC, 1); - - // produce messages to different partitions. - produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"5.com\"}"); - produceMessage(INPUT_TOPIC, 1, "userId2", "{\"userId\":\"userId2\", \"country\":\"china\",\"url\":\"4.com\"}"); - produceMessage(INPUT_TOPIC, 2, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"1.com\"}"); - produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"2.com\"}"); - produceMessage(INPUT_TOPIC, 1, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"3.com\"}"); - - // run the application - RepartitionWindowApp app = new RepartitionWindowApp(); - runApplication(app, APP_NAME, null); - - // consume and validate result - List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2); - Assert.assertEquals(messages.size(), 2); - - for (ConsumerRecord<String, String> message : messages) { - String key = message.key(); - String value = message.value(); - // Assert that there are 4 messages for userId1 and 1 message for userId2. - Assert.assertTrue(key.equals("userId1") || key.equals("userId2")); - if ("userId1".equals(key)) { - Assert.assertEquals("4", value); - } else { - Assert.assertEquals("1", value); - } - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java index 3f3e615..151c9d1 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java @@ -30,6 +30,7 @@ import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.test.operator.data.PageView; import java.time.Duration; http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java new file mode 100644 index 0000000..ee699ae --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/AdClick.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.operator.data; + + +public class AdClick { + private String viewId; + private String adId; + + public String getViewId() { + return viewId; + } + + public void setViewId(String viewId) { + this.viewId = viewId; + } + + public String getAdId() { + return adId; + } + + public void setAdId(String adId) { + this.adId = adId; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java new file mode 100644 index 0000000..d2cebf9 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.operator.data; + + +public class PageView { + private String viewId; + private String pageId; + private String userId; + + public String getViewId() { + return viewId; + } + + public void setViewId(String viewId) { + this.viewId = viewId; + } + + public String getPageId() { + return pageId; + } + + public void setPageId(String pageId) { + this.pageId = pageId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/a671288e/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java new file mode 100644 index 0000000..e5f7b53 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/UserPageAdClick.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.operator.data; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +public class UserPageAdClick { + private String userId; + private String pageId; + private String adId; + + @JsonCreator + public UserPageAdClick( + @JsonProperty("userId") String userId, + @JsonProperty("pageId") String pageId, + @JsonProperty("adId") String adId) { + this.userId = userId; + this.pageId = pageId; + this.adId = adId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public String getPageId() { + return pageId; + } + + public void setPageId(String pageId) { + this.pageId = pageId; + } + + public String getAdId() { + return adId; + } + + public void setAdId(String adId) { + this.adId = adId; + } +}
