Repository: samza Updated Branches: refs/heads/samza-sql adcd26678 -> 1dac25e17
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java deleted file mode 100644 index 8faa92c..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.junit.Before; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestTriggerBuilder{ - private Field earlyTriggerField; - private Field lateTriggerField; - private Field timerTriggerField; - private Field earlyTriggerUpdater; - private Field lateTriggerUpdater; - - @Before - public void testPrep() throws Exception { - this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger"); - this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger"); - this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger"); - this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater"); - this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater"); - - this.earlyTriggerField.setAccessible(true); - this.lateTriggerField.setAccessible(true); - this.timerTriggerField.setAccessible(true); - this.earlyTriggerUpdater.setAccessible(true); - this.lateTriggerUpdater.setAccessible(true); - } - - @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException { - TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = - (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - when(mockState.getNumberMessages()).thenReturn(2000L); - assertTrue(triggerField.apply(null, mockState)); - - Function<TestMessage, Boolean> tokenFunc = m -> true; - builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc); - triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - TestMessage m = mock(TestMessage.class); - assertTrue(triggerField.apply(m, mockState)); - - builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L); - triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L); - when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L); - when(m.getTimestamp()).thenReturn(19999000000L); - assertFalse(triggerField.apply(m, mockState)); - when(m.getTimestamp()).thenReturn(32000000000L); - assertTrue(triggerField.apply(m, mockState)); - when(m.getTimestamp()).thenReturn(1001000000L); - when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L); - assertTrue(triggerField.apply(m, mockState)); - - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class); - builder = TriggerBuilder.earlyTrigger(mockFunc); - triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - assertEquals(triggerField, mockFunc); - - builder = TriggerBuilder.timeoutSinceFirstMessage(10000L); - Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = - (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getFirstMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - - builder = TriggerBuilder.timeoutSinceLastMessage(10000L); - timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getLastMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000)); - assertFalse(timerTrigger.apply(mockState)); - } - - @Test public void testAddTimerTriggers() throws IllegalAccessException { - TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addTimeoutSinceFirstMessage(10000L); - // exam that both earlyTrigger and timer triggers are set up - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField = - (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - // check the timer trigger - Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger = - (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getFirstMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - - // exam that both early trigger and timer triggers are set up - builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - builder.addTimeoutSinceLastMessage(20000L); - // check the timer trigger - timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getLastMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - } - - @Test public void testAddLateTriggers() throws IllegalAccessException { - TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addLateTriggerOnSizeLimit(10000L); - // exam that both earlyTrigger and lateTriggers are set up - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger = - (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(earlyTrigger.apply(null, mockState)); - // check the late trigger - BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger = - (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); - assertFalse(lateTrigger.apply(null, mockState)); - // set the number of messages to 10001 to trigger the late trigger - when(mockState.getNumberMessages()).thenReturn(10001L); - assertTrue(lateTrigger.apply(null, mockState)); - - builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0); - // exam that both earlyTrigger and lateTriggers are set up - earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder); - mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(earlyTrigger.apply(null, mockState)); - // exam the lateTrigger - when(mockState.getOutputValue()).thenReturn(new ArrayList<>()); - lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder); - assertFalse(lateTrigger.apply(null, mockState)); - List<TestMessage> mockList = mock(ArrayList.class); - when(mockList.size()).thenReturn(200); - when(mockState.getOutputValue()).thenReturn(mockList); - assertTrue(lateTrigger.apply(null, mockState)); - } - - @Test public void testAddTriggerUpdater() throws IllegalAccessException { - TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.onEarlyTrigger(c -> { c.clear(); return c;} ); - List<TestMessage> collection = new ArrayList<TestMessage>() {{ - for(int i = 0; i < 10; i++) { - this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime())); - } - }}; - // exam that earlyTriggerUpdater is set up - Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater = - (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder); - WindowState<Collection<TestMessage>> mockState = mock(WindowState.class); - when(mockState.getOutputValue()).thenReturn(collection); - earlyTriggerUpdater.apply(mockState); - assertTrue(collection.isEmpty()); - - collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime())); - collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime())); - builder.onLateTrigger(c -> { - c.removeIf(t -> t.getKey().equals("key-to-remove")); - return c; - }); - // check the late trigger updater - Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater = - (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder); - when(mockState.getOutputValue()).thenReturn(collection); - lateTriggerUpdater.apply(mockState); - assertTrue(collection.size() == 1); - assertFalse(collection.get(0).isDelete()); - assertEquals(collection.get(0).getKey(), "key-to-stay"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java b/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java deleted file mode 100644 index 47a37dc..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api; - -import org.apache.samza.operators.api.Windows.Window; -import org.apache.samza.operators.api.internal.Trigger; -import org.apache.samza.operators.api.internal.WindowOutput; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestWindows { - - @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException { - // test constructing the default session window - Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions( - TestMessage::getKey); - assertTrue(testWnd instanceof Windows.SessionWindow); - Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction"); - Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator"); - wndKeyFuncField.setAccessible(true); - aggregatorField.setAccessible(true); - Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd); - assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key"); - BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc = - (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd); - TestMessage mockMsg = mock(TestMessage.class); - Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>()); - assertTrue(collection.size() == 1); - assertTrue(collection.contains(mockMsg)); - - // test constructing the session window w/ customized session info - Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions( - m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0)); - assertTrue(testWnd2 instanceof Windows.SessionWindow); - wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2); - aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2); - assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0"); - when(mockMsg.getMessage()).thenReturn("x-001"); - collection = aggrFunc.apply(mockMsg, new ArrayList<>()); - assertTrue(collection.size() == 1); - assertTrue(collection.contains('x')); - - // test constructing session window w/ a default counter - Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( - m -> String.format("key-%d", m.getTimestamp())); - assertTrue(testCounter instanceof Windows.SessionWindow); - wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter); - BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter); - when(mockMsg.getTimestamp()).thenReturn(12345L); - assertEquals(wndKeyFunc.apply(mockMsg), "key-12345"); - assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2)); - } - - @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException { - Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( - m -> String.format("key-%d", m.getTimestamp())); - // test session window w/ a trigger - TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L); - testCounter.setTriggers(triggerBuilder); - Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build(); - Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger(); - // examine all trigger fields are expected - Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); - Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); - Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); - Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater"); - Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater"); - earlyTriggerField.setAccessible(true); - lateTriggerField.setAccessible(true); - timerTriggerField.setAccessible(true); - earlyTriggerUpdater.setAccessible(true); - lateTriggerUpdater.setAccessible(true); - assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger)); - assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger)); - assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger)); - assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger)); - assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java deleted file mode 100644 index e953078..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.data; - -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestIncomingSystemMessage { - - @Test public void testConstructor() { - IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class); - IncomingSystemMessage ism = new IncomingSystemMessage(ime); - - Object mockKey = mock(Object.class); - Object mockValue = mock(Object.class); - LongOffset testOffset = new LongOffset("12345"); - SystemStreamPartition mockSsp = mock(SystemStreamPartition.class); - - when(ime.getKey()).thenReturn(mockKey); - when(ime.getMessage()).thenReturn(mockValue); - when(ime.getSystemStreamPartition()).thenReturn(mockSsp); - when(ime.getOffset()).thenReturn("12345"); - - assertEquals(ism.getKey(), mockKey); - assertEquals(ism.getMessage(), mockValue); - assertEquals(ism.getSystemStreamPartition(), mockSsp); - assertEquals(ism.getOffset(), testOffset); - assertFalse(ism.isDelete()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java b/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java deleted file mode 100644 index 10775ec..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.data; - -import org.junit.Test; - -import java.lang.reflect.Field; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; - - -public class TestLongOffset { - - @Test public void testConstructor() throws Exception { - LongOffset o1 = new LongOffset("12345"); - Field offsetField = LongOffset.class.getDeclaredField("offset"); - offsetField.setAccessible(true); - Long x = (Long) offsetField.get(o1); - assertEquals(x.longValue(), 12345L); - - o1 = new LongOffset("012345"); - x = (Long) offsetField.get(o1); - assertEquals(x.longValue(), 12345L); - - try { - o1 = new LongOffset("xyz"); - fail("Constructor of LongOffset should have failed w/ mal-formatted numbers"); - } catch (NumberFormatException nfe) { - // expected - } - } - - @Test public void testComparator() { - LongOffset o1 = new LongOffset("11111"); - Offset other = mock(Offset.class); - try { - o1.compareTo(other); - fail("compareTo() should have have failed when comparing to an object of a different class"); - } catch (IllegalArgumentException iae) { - // expected - } - - LongOffset o2 = new LongOffset("-10000"); - assertEquals(o1.compareTo(o2), 1); - LongOffset o3 = new LongOffset("22222"); - assertEquals(o1.compareTo(o3), -1); - LongOffset o4 = new LongOffset("11111"); - assertEquals(o1.compareTo(o4), 0); - } - - @Test public void testEquals() { - LongOffset o1 = new LongOffset("12345"); - Offset other = mock(Offset.class); - assertFalse(o1.equals(other)); - - LongOffset o2 = new LongOffset("0012345"); - assertTrue(o1.equals(o2)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java deleted file mode 100644 index 6dc77e5..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.internal; - -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestOperators { - - private class TestMessage implements Message<String, Object> { - private final long timestamp; - private final String key; - private final Object msg; - - - TestMessage(String key, Object msg, long timestamp) { - this.timestamp = timestamp; - this.key = key; - this.msg = msg; - } - - @Override public Object getMessage() { - return this.msg; - } - - @Override public String getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return this.timestamp; - } - } - - @Test public void testGetStreamOperator() { - Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() {{ - this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L)); - }}; - Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn); - assertEquals(strmOp.getFunction(), transformFn); - assertTrue(strmOp.getOutputStream() instanceof MessageStream); - } - - @Test public void testGetSinkOperator() { - MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> {}; - Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn); - assertEquals(sinkOp.getFunction(), sinkFn); - assertTrue(sinkOp.getOutputStream() == null); - } - - @Test public void testGetWindowOperator() { - WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class); - BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null; - Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class); - Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class); - MessageStream<TestMessage> mockInput = mock(MessageStream.class); - when(windowFn.getTransformFunc()).thenReturn(xFunction); - when(windowFn.getStoreFuncs()).thenReturn(storeFns); - when(windowFn.getTrigger()).thenReturn(trigger); - when(mockInput.toString()).thenReturn("mockStream1"); - - Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn); - assertEquals(windowOp.getFunction(), xFunction); - assertEquals(windowOp.getStoreFunctions(), storeFns); - assertEquals(windowOp.getTrigger(), trigger); - assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString())); - } - - @Test public void testGetPartialJoinOperator() { - BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger = - (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(), - Math.max(m1.getTimestamp(), m2.getTimestamp())); - MessageStream<TestMessage> joinOutput = new MessageStream<>(); - Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin = - Operators.getPartialJoinOperator(merger, joinOutput); - - assertEquals(partialJoin.getOutputStream(), joinOutput); - Message<Object, Object> m = mock(Message.class); - Message<Object, Object> s = mock(Message.class); - assertEquals(partialJoin.getFunction(), merger); - assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); - assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m); - assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey()); - assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater()); - } - - @Test public void testGetMergeOperator() { - MessageStream<TestMessage> output = new MessageStream<>(); - Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output); - Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() {{ - this.add(t); - }}; - TestMessage t = mock(TestMessage.class); - assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t)); - assertEquals(mergeOp.getOutputStream(), output); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java deleted file mode 100644 index 727276a..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.internal; - -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.data.Message; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; - - -public class TestTrigger { - - @Test public void testConstructor() throws Exception { - BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000; - BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000; - Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis(); - Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> { s.setOutputValue(0); return s; }; - Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> { s.setOutputValue(1); return s; }; - - Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger, - earlyTriggerUpdater, lateTriggerUpdater); - - Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); - Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); - Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); - Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater"); - Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater"); - earlyTriggerField.setAccessible(true); - lateTriggerField.setAccessible(true); - timerTriggerField.setAccessible(true); - earlyTriggerUpdaterField.setAccessible(true); - lateTriggerUpdaterField.setAccessible(true); - - assertEquals(earlyTrigger, earlyTriggerField.get(trigger)); - assertEquals(timerTrigger, timerTriggerField.get(trigger)); - assertEquals(lateTrigger, lateTriggerField.get(trigger)); - assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger)); - assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java b/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java deleted file mode 100644 index f3cf0e0..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestWindowOutput.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.api.internal; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - - -public class TestWindowOutput { - - @Test public void testConstructor() { - WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10); - assertEquals(wndOutput.getKey(), "testMsg"); - assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); - assertFalse(wndOutput.isDelete()); - assertEquals(wndOutput.getTimestamp(), 0); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java index d4d2378..33901a9 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java @@ -19,10 +19,10 @@ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.TestMessage; -import org.apache.samza.operators.api.TestOutputMessage; -import org.apache.samza.operators.api.Windows; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.operators.Windows; import org.apache.samza.task.TaskContext; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java index d228784..a49bfd3 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java @@ -18,14 +18,14 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.TestMessage; -import org.apache.samza.operators.api.TestOutputMessage; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.internal.Operators.PartialJoinOperator; -import org.apache.samza.operators.api.internal.Operators.SinkOperator; -import org.apache.samza.operators.api.internal.Operators.StreamOperator; -import org.apache.samza.operators.api.internal.Operators.WindowOperator; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators.PartialJoinOperator; +import org.apache.samza.operators.internal.Operators.SinkOperator; +import org.apache.samza.operators.internal.Operators.StreamOperator; +import org.apache.samza.operators.internal.Operators.WindowOperator; import org.apache.samza.operators.impl.join.PartialJoinOpImpl; import org.apache.samza.operators.impl.window.SessionWindowImpl; import org.apache.samza.task.MessageCollector; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index d296111..4bd467d 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -18,8 +18,8 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.TestMessage; -import org.apache.samza.operators.api.TestOutputMessage; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.TestOutputMessage; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java index 14796fc..224245e 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java @@ -18,7 +18,7 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.TestMessage; +import org.apache.samza.operators.TestMessage; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java index c8c4944..69f16d0 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java @@ -18,9 +18,9 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.TestMessage; -import org.apache.samza.operators.api.TestOutputMessage; -import org.apache.samza.operators.api.internal.Operators.StreamOperator; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.operators.internal.Operators.StreamOperator; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java index e711bc5..cdac3fc 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -18,9 +18,9 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.TestOutputMessage; -import org.apache.samza.operators.api.internal.Operators.SinkOperator; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TestOutputMessage; +import org.apache.samza.operators.internal.Operators.SinkOperator; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java index eb8937a..5ede757 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java @@ -18,9 +18,9 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.api.TestMessage; -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.internal.Operators.StoreFunctions; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.internal.Operators.StoreFunctions; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.TaskContext; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java index 75cb00c..719ab99 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java @@ -18,12 +18,12 @@ */ package org.apache.samza.operators.impl.window; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.TestMessage; -import org.apache.samza.operators.api.WindowState; -import org.apache.samza.operators.api.internal.Operators.StoreFunctions; -import org.apache.samza.operators.api.internal.Operators.WindowOperator; -import org.apache.samza.operators.api.internal.WindowOutput; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.TestMessage; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.internal.Operators.StoreFunctions; +import org.apache.samza.operators.internal.Operators.WindowOperator; +import org.apache.samza.operators.internal.WindowOutput; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java index 724bbba..493a688 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java +++ b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java @@ -20,12 +20,13 @@ package org.apache.samza.task; import org.apache.avro.generic.GenericRecord; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.api.Windows; -import org.apache.samza.operators.api.TriggerBuilder; -import org.apache.samza.operators.api.data.IncomingSystemMessage; -import org.apache.samza.operators.api.data.Offset; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.Windows; +import org.apache.samza.operators.TriggerBuilder; +import org.apache.samza.operators.data.IncomingSystemMessage; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.task.StreamOperatorTask; import org.apache.samza.system.SystemStreamPartition; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java index 33ae9c9..820d4f3 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java +++ b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java @@ -18,9 +18,9 @@ */ package org.apache.samza.task; -import org.apache.samza.operators.api.data.InputSystemMessage; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.data.Offset; +import org.apache.samza.operators.data.InputSystemMessage; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.data.Offset; import org.apache.samza.system.SystemStreamPartition; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java index 825f4c4..00d01a8 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java +++ b/samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java @@ -19,10 +19,11 @@ package org.apache.samza.task; -import org.apache.samza.operators.api.MessageStream; -import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.api.data.IncomingSystemMessage; -import org.apache.samza.operators.api.data.Offset; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.data.IncomingSystemMessage; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.task.StreamOperatorTask; import org.apache.samza.system.SystemStreamPartition; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java index 306425e..153d517 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java +++ b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java @@ -20,6 +20,7 @@ package org.apache.samza.task; import org.apache.samza.config.Config; import org.apache.samza.operators.impl.ChainedOperators; +import org.apache.samza.operators.task.StreamOperatorTask; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.Partition; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java index d6181ea..fe0ca42 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java +++ b/samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java @@ -36,7 +36,7 @@ import static org.mockito.Mockito.when; /** - * Unit test for {@link StreamOperatorTask} + * Unit test for {@link org.apache.samza.operators.task.StreamOperatorTask} */ public class TestStreamOperatorTasks { http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java index 11186ea..de7bba5 100644 --- a/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java +++ b/samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java @@ -19,11 +19,12 @@ package org.apache.samza.task; -import org.apache.samza.operators.api.MessageStreams.SystemMessageStream; -import org.apache.samza.operators.api.TriggerBuilder; -import org.apache.samza.operators.api.Windows; -import org.apache.samza.operators.api.data.Message; -import org.apache.samza.operators.api.data.Offset; +import org.apache.samza.operators.MessageStreams.SystemMessageStream; +import org.apache.samza.operators.TriggerBuilder; +import org.apache.samza.operators.Windows; +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.task.StreamOperatorTask; import org.apache.samza.system.SystemStreamPartition; import java.util.Collection;
