http://git-wip-us.apache.org/repos/asf/beam/blob/f7dc6160/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java new file mode 100644 index 0000000..cd12c92 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java @@ -0,0 +1,633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.dataflow; + +import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.dataflow.BatchViewOverrides.TransformedMap; +import org.apache.beam.runners.dataflow.internal.IsmFormat; +import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; +import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; +import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link BatchViewOverrides}. + */ +@RunWith(JUnit4.class) +public class BatchViewOverridesTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + @Test + public void testBatchViewAsSingletonToIsmRecord() throws Exception { + DoFnTester< + KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>, + IsmRecord<WindowedValue<String>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn< + String, GlobalWindow>(GlobalWindow.Coder.INSTANCE)); + + assertThat( + doFnTester.processBundle( + ImmutableList.of( + KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of( + 0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))), + contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a")))); + } + + @Test + public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException() + throws Exception { + DoFnTester< + KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>, + IsmRecord<WindowedValue<String>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn< + String, GlobalWindow>(GlobalWindow.Coder.INSTANCE)); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("found for singleton within window"); + doFnTester.processBundle( + ImmutableList.of( + KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of( + 0, + ImmutableList.of( + KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")), + KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b")))))); + } + + @Test + public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception { + DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>()); + + // The order of the output elements is important relative to processing order + assertThat( + doFnTester.processBundle(ImmutableList.of("a", "b", "c")), + contains( + IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")), + IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")), + IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c")))); + } + + @Test + public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception { + DoFnTester< + KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>, + IsmRecord<WindowedValue<Long>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn< + Long, IntervalWindow>(IntervalWindow.getCoder())); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>> inputElements = + ImmutableList.of( + KV.of( + 1, + (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) + ImmutableList.of( + KV.of( + windowA, + WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowA, + WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowA, + WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowB, + WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)), + KV.of( + windowB, + WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)))), + KV.of( + 2, + (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) + ImmutableList.of( + KV.of( + windowC, + WindowedValue.of( + 210L, new Instant(25), windowC, PaneInfo.NO_FIRING))))); + + // The order of the output elements is important relative to processing order + assertThat( + doFnTester.processBundle(inputElements), + contains( + IsmRecord.of( + ImmutableList.of(windowA, 0L), + WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(windowA, 1L), + WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(windowA, 2L), + WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(windowB, 0L), + WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(windowB, 1L), + WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(windowC, 0L), + WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING)))); + } + + @Test + public void testToIsmRecordForMapLikeDoFn() throws Exception { + TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); + TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); + + Coder<Long> keyCoder = VarLongCoder.of(); + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + + IsmRecordCoder<WindowedValue<Long>> ismCoder = + IsmRecordCoder.of( + 1, + 2, + ImmutableList.<Coder<?>>of( + MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()), + FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); + + DoFnTester< + KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>, + IsmRecord<WindowedValue<Long>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>( + outputForSizeTag, + outputForEntrySetTag, + windowCoder, + keyCoder, + ismCoder, + false /* unique keys */)); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> + inputElements = + ImmutableList.of( + KV.of( + 1, + (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) + ImmutableList.of( + KV.of( + KV.of(1L, windowA), + WindowedValue.of( + 110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + // same window same key as to previous + KV.of( + KV.of(1L, windowA), + WindowedValue.of( + 111L, new Instant(2), windowA, PaneInfo.NO_FIRING)), + // same window different key as to previous + KV.of( + KV.of(2L, windowA), + WindowedValue.of( + 120L, new Instant(3), windowA, PaneInfo.NO_FIRING)), + // different window same key as to previous + KV.of( + KV.of(2L, windowB), + WindowedValue.of( + 210L, new Instant(11), windowB, PaneInfo.NO_FIRING)), + // different window and different key as to previous + KV.of( + KV.of(3L, windowB), + WindowedValue.of( + 220L, new Instant(12), windowB, PaneInfo.NO_FIRING)))), + KV.of( + 2, + (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) + ImmutableList.of( + // different shard + KV.of( + KV.of(4L, windowC), + WindowedValue.of( + 330L, new Instant(21), windowC, PaneInfo.NO_FIRING))))); + + // The order of the output elements is important relative to processing order + assertThat( + doFnTester.processBundle(inputElements), + contains( + IsmRecord.of( + ImmutableList.of(1L, windowA, 0L), + WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(1L, windowA, 1L), + WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(2L, windowA, 0L), + WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(2L, windowB, 0L), + WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(3L, windowB, 0L), + WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(4L, windowC, 0L), + WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))); + + // Verify the number of unique keys per window. + assertThat( + doFnTester.takeSideOutputElements(outputForSizeTag), + contains( + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), + KV.of(windowA, 2L)), + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + KV.of(windowB, 2L)), + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)), + KV.of(windowC, 1L)))); + + // Verify the output for the unique keys. + assertThat( + doFnTester.takeSideOutputElements(outputForEntrySetTag), + contains( + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), + KV.of(windowA, 1L)), + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), + KV.of(windowA, 2L)), + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + KV.of(windowB, 2L)), + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + KV.of(windowB, 3L)), + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)), + KV.of(windowC, 4L)))); + } + + @Test + public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception { + TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); + TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); + + Coder<Long> keyCoder = VarLongCoder.of(); + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + + IsmRecordCoder<WindowedValue<Long>> ismCoder = + IsmRecordCoder.of( + 1, + 2, + ImmutableList.<Coder<?>>of( + MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()), + FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); + + DoFnTester< + KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>, + IsmRecord<WindowedValue<Long>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>( + outputForSizeTag, + outputForEntrySetTag, + windowCoder, + keyCoder, + ismCoder, + true /* unique keys */)); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + + Iterable<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> + inputElements = + ImmutableList.of( + KV.of( + 1, + (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) + ImmutableList.of( + KV.of( + KV.of(1L, windowA), + WindowedValue.of( + 110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + // same window same key as to previous + KV.of( + KV.of(1L, windowA), + WindowedValue.of( + 111L, new Instant(2), windowA, PaneInfo.NO_FIRING))))); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unique keys are expected but found key"); + doFnTester.processBundle(inputElements); + } + + @Test + public void testToIsmMetadataRecordForSizeDoFn() throws Exception { + TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); + TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); + + Coder<Long> keyCoder = VarLongCoder.of(); + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + + IsmRecordCoder<WindowedValue<Long>> ismCoder = + IsmRecordCoder.of( + 1, + 2, + ImmutableList.<Coder<?>>of( + MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()), + FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); + + DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, IsmRecord<WindowedValue<Long>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn< + Long, Long, IntervalWindow>(windowCoder)); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements = + ImmutableList.of( + KV.of( + 1, + (Iterable<KV<IntervalWindow, Long>>) + ImmutableList.of(KV.of(windowA, 2L), KV.of(windowA, 3L), KV.of(windowB, 7L))), + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(KV.of(windowC, 9L)))); + + // The order of the output elements is important relative to processing order + assertThat( + doFnTester.processBundle(inputElements), + contains( + IsmRecord.<WindowedValue<Long>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)), + IsmRecord.<WindowedValue<Long>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 0L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 7L)), + IsmRecord.<WindowedValue<Long>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 0L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 9L)))); + } + + @Test + public void testToIsmMetadataRecordForKeyDoFn() throws Exception { + TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); + TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); + + Coder<Long> keyCoder = VarLongCoder.of(); + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + + IsmRecordCoder<WindowedValue<Long>> ismCoder = + IsmRecordCoder.of( + 1, + 2, + ImmutableList.<Coder<?>>of( + MetadataKeyCoder.of(keyCoder), IntervalWindow.getCoder(), BigEndianLongCoder.of()), + FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); + + DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, IsmRecord<WindowedValue<Long>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn< + Long, Long, IntervalWindow>(keyCoder, windowCoder)); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements = + ImmutableList.of( + KV.of( + 1, + (Iterable<KV<IntervalWindow, Long>>) + ImmutableList.of( + KV.of(windowA, 2L), + // same window as previous + KV.of(windowA, 3L), + // different window as previous + KV.of(windowB, 3L))), + KV.of( + ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of(KV.of(windowC, 3L)))); + + // The order of the output elements is important relative to processing order + assertThat( + doFnTester.processBundle(inputElements), + contains( + IsmRecord.<WindowedValue<Long>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)), + IsmRecord.<WindowedValue<Long>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 2L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)), + IsmRecord.<WindowedValue<Long>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 1L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)), + IsmRecord.<WindowedValue<Long>>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 1L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)))); + } + + @Test + public void testToMapDoFn() throws Exception { + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + + DoFnTester< + KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>, + IsmRecord<WindowedValue<TransformedMap<Long, WindowedValue<Long>, Long>>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsMap.ToMapDoFn<Long, Long, IntervalWindow>( + windowCoder)); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> + inputElements = + ImmutableList.of( + KV.of( + 1, + (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) + ImmutableList.of( + KV.of( + windowA, + WindowedValue.of( + KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowA, + WindowedValue.of( + KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowB, + WindowedValue.of( + KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)), + KV.of( + windowB, + WindowedValue.of( + KV.of(3L, 31L), + new Instant(15), + windowB, + PaneInfo.NO_FIRING)))), + KV.of( + 2, + (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) + ImmutableList.of( + KV.of( + windowC, + WindowedValue.of( + KV.of(4L, 41L), + new Instant(25), + windowC, + PaneInfo.NO_FIRING))))); + + // The order of the output elements is important relative to processing order + List<IsmRecord<WindowedValue<TransformedMap<Long, WindowedValue<Long>, Long>>>> output = + doFnTester.processBundle(inputElements); + assertEquals(3, output.size()); + Map<Long, Long> outputMap; + + outputMap = output.get(0).getValue().getValue(); + assertEquals(2, outputMap.size()); + assertEquals(ImmutableMap.of(1L, 11L, 2L, 21L), outputMap); + + outputMap = output.get(1).getValue().getValue(); + assertEquals(2, outputMap.size()); + assertEquals(ImmutableMap.of(2L, 21L, 3L, 31L), outputMap); + + outputMap = output.get(2).getValue().getValue(); + assertEquals(1, outputMap.size()); + assertEquals(ImmutableMap.of(4L, 41L), outputMap); + } + + @Test + public void testToMultimapDoFn() throws Exception { + Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); + + DoFnTester< + KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>, + IsmRecord< + WindowedValue<TransformedMap<Long, Iterable<WindowedValue<Long>>, Iterable<Long>>>>> + doFnTester = + DoFnTester.of( + new BatchViewOverrides.BatchViewAsMultimap.ToMultimapDoFn< + Long, Long, IntervalWindow>(windowCoder)); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> + inputElements = + ImmutableList.of( + KV.of( + 1, + (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) + ImmutableList.of( + KV.of( + windowA, + WindowedValue.of( + KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowA, + WindowedValue.of( + KV.of(1L, 12L), new Instant(5), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowA, + WindowedValue.of( + KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowB, + WindowedValue.of( + KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)), + KV.of( + windowB, + WindowedValue.of( + KV.of(3L, 31L), + new Instant(15), + windowB, + PaneInfo.NO_FIRING)))), + KV.of( + 2, + (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) + ImmutableList.of( + KV.of( + windowC, + WindowedValue.of( + KV.of(4L, 41L), + new Instant(25), + windowC, + PaneInfo.NO_FIRING))))); + + // The order of the output elements is important relative to processing order + List< + IsmRecord< + WindowedValue<TransformedMap<Long, Iterable<WindowedValue<Long>>, Iterable<Long>>>>> + output = doFnTester.processBundle(inputElements); + assertEquals(3, output.size()); + Map<Long, Iterable<Long>> outputMap; + + outputMap = output.get(0).getValue().getValue(); + assertEquals(2, outputMap.size()); + assertThat(outputMap.get(1L), containsInAnyOrder(11L, 12L)); + assertThat(outputMap.get(2L), containsInAnyOrder(21L)); + + outputMap = output.get(1).getValue().getValue(); + assertEquals(2, outputMap.size()); + assertThat(outputMap.get(2L), containsInAnyOrder(21L)); + assertThat(outputMap.get(3L), containsInAnyOrder(31L)); + + outputMap = output.get(2).getValue().getValue(); + assertEquals(1, outputMap.size()); + assertThat(outputMap.get(4L), containsInAnyOrder(41L)); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/f7dc6160/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index b2bc319..d847880 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -17,15 +17,12 @@ */ package org.apache.beam.runners.dataflow; -import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; import static org.hamcrest.Matchers.both; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; -import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -45,7 +42,6 @@ import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; import java.net.URL; @@ -59,25 +55,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.regex.Pattern; -import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList; -import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap; -import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap; -import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsSingleton; -import org.apache.beam.runners.dataflow.DataflowRunner.TransformedMap; -import org.apache.beam.runners.dataflow.internal.IsmFormat; -import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; -import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; -import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; @@ -89,26 +73,17 @@ import org.apache.beam.sdk.runners.dataflow.TestCountingSource; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.NoopCredentialFactory; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.TestCredential; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; import org.hamcrest.Description; import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; @@ -1090,461 +1065,6 @@ public class DataflowRunnerTest { testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false); } - @Test - public void testBatchViewAsSingletonToIsmRecord() throws Exception { - DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>, - IsmRecord<WindowedValue<String>>> doFnTester = - DoFnTester.of( - new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn - <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE)); - - assertThat( - doFnTester.processBundle( - ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of( - 0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))), - contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a")))); - } - - @Test - public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException() - throws Exception { - DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>, - IsmRecord<WindowedValue<String>>> doFnTester = - DoFnTester.of( - new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn - <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE)); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("found for singleton within window"); - doFnTester.processBundle(ImmutableList.of( - KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(0, - ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")), - KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b")))))); - } - - @Test - public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception { - DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester = - DoFnTester.of(new BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>()); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(ImmutableList.of("a", "b", "c")), contains( - IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")), - IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")), - IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c")))); - } - - @Test - public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception { - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = - DoFnTester.of( - new BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn<Long, IntervalWindow>( - IntervalWindow.getCoder())); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<Long>>>>> inputElements = - ImmutableList.of( - KV.of(1, (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) ImmutableList.of( - KV.of( - windowA, WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - KV.of( - windowA, WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)), - KV.of( - windowA, WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)), - KV.of( - windowB, WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)), - KV.of( - windowB, WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)) - )), - KV.of(2, (Iterable<KV<IntervalWindow, WindowedValue<Long>>>) ImmutableList.of( - KV.of( - windowC, WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING)) - ))); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(inputElements), contains( - IsmRecord.of(ImmutableList.of(windowA, 0L), - WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowA, 1L), - WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowA, 2L), - WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowB, 0L), - WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowB, 1L), - WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)), - IsmRecord.of(ImmutableList.of(windowC, 0L), - WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING)))); - } - - @Test - public void testToIsmRecordForMapLikeDoFn() throws Exception { - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); - - Coder<Long> keyCoder = VarLongCoder.of(); - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of( - 1, - 2, - ImmutableList.<Coder<?>>of( - MetadataKeyCoder.of(keyCoder), - IntervalWindow.getCoder(), - BigEndianLongCoder.of()), - FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); - - DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = - DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>( - outputForSizeTag, - outputForEntrySetTag, - windowCoder, - keyCoder, - ismCoder, - false /* unique keys */)); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, - Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> inputElements = - ImmutableList.of( - KV.of(1, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of( - KV.of(KV.of(1L, windowA), - WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - // same window same key as to previous - KV.of(KV.of(1L, windowA), - WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)), - // same window different key as to previous - KV.of(KV.of(2L, windowA), - WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)), - // different window same key as to previous - KV.of(KV.of(2L, windowB), - WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)), - // different window and different key as to previous - KV.of(KV.of(3L, windowB), - WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)))), - KV.of(2, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of( - // different shard - KV.of(KV.of(4L, windowC), - WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING))))); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(inputElements), contains( - IsmRecord.of( - ImmutableList.of(1L, windowA, 0L), - WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(1L, windowA, 1L), - WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(2L, windowA, 0L), - WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(2L, windowB, 0L), - WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(3L, windowB, 0L), - WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)), - IsmRecord.of( - ImmutableList.of(4L, windowC, 0L), - WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))); - - // Verify the number of unique keys per window. - assertThat(doFnTester.takeSideOutputElements(outputForSizeTag), contains( - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), - KV.of(windowA, 2L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - KV.of(windowB, 2L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)), - KV.of(windowC, 1L)) - )); - - // Verify the output for the unique keys. - assertThat(doFnTester.takeSideOutputElements(outputForEntrySetTag), contains( - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), - KV.of(windowA, 1L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), - KV.of(windowA, 2L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - KV.of(windowB, 2L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - KV.of(windowB, 3L)), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)), - KV.of(windowC, 4L)) - )); - } - - @Test - public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception { - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); - - Coder<Long> keyCoder = VarLongCoder.of(); - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of( - 1, - 2, - ImmutableList.<Coder<?>>of( - MetadataKeyCoder.of(keyCoder), - IntervalWindow.getCoder(), - BigEndianLongCoder.of()), - FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); - - DoFnTester<KV<Integer, Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = - DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn<>( - outputForSizeTag, - outputForEntrySetTag, - windowCoder, - keyCoder, - ismCoder, - true /* unique keys */)); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - - Iterable<KV<Integer, - Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>>> inputElements = - ImmutableList.of( - KV.of(1, (Iterable<KV<KV<Long, IntervalWindow>, WindowedValue<Long>>>) ImmutableList.of( - KV.of(KV.of(1L, windowA), - WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), - // same window same key as to previous - KV.of(KV.of(1L, windowA), - WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING))))); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Unique keys are expected but found key"); - doFnTester.processBundle(inputElements); - } - - @Test - public void testToIsmMetadataRecordForSizeDoFn() throws Exception { - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); - - Coder<Long> keyCoder = VarLongCoder.of(); - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of( - 1, - 2, - ImmutableList.<Coder<?>>of( - MetadataKeyCoder.of(keyCoder), - IntervalWindow.getCoder(), - BigEndianLongCoder.of()), - FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); - - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of( - new BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn<Long, Long, IntervalWindow>( - windowCoder)); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements = - ImmutableList.of( - KV.of(1, - (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of( - KV.of(windowA, 2L), - KV.of(windowA, 3L), - KV.of(windowB, 7L))), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of( - KV.of(windowC, 9L)))); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(inputElements), contains( - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 0L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 7L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 0L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 9L)) - )); - } - - @Test - public void testToIsmMetadataRecordForKeyDoFn() throws Exception { - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForSizeTag = new TupleTag<>(); - TupleTag<KV<Integer, KV<IntervalWindow, Long>>> outputForEntrySetTag = new TupleTag<>(); - - Coder<Long> keyCoder = VarLongCoder.of(); - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - IsmRecordCoder<WindowedValue<Long>> ismCoder = IsmRecordCoder.of( - 1, - 2, - ImmutableList.<Coder<?>>of( - MetadataKeyCoder.of(keyCoder), - IntervalWindow.getCoder(), - BigEndianLongCoder.of()), - FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); - - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, Long>>>, - IsmRecord<WindowedValue<Long>>> doFnTester = DoFnTester.of( - new BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn<Long, Long, IntervalWindow>( - keyCoder, windowCoder)); - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, Iterable<KV<IntervalWindow, Long>>>> inputElements = - ImmutableList.of( - KV.of(1, - (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of( - KV.of(windowA, 2L), - // same window as previous - KV.of(windowA, 3L), - // different window as previous - KV.of(windowB, 3L))), - KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), - (Iterable<KV<IntervalWindow, Long>>) ImmutableList.of( - KV.of(windowC, 3L)))); - - // The order of the output elements is important relative to processing order - assertThat(doFnTester.processBundle(inputElements), contains( - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 2L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 1L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)), - IsmRecord.<WindowedValue<Long>>meta( - ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 1L), - CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)) - )); - } - - @Test - public void testToMapDoFn() throws Exception { - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>, - IsmRecord<WindowedValue<TransformedMap<Long, - WindowedValue<Long>, - Long>>>> doFnTester = - DoFnTester.of(new BatchViewAsMap.ToMapDoFn<Long, Long, IntervalWindow>(windowCoder)); - - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, - Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> inputElements = - ImmutableList.of( - KV.of(1, - (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of( - KV.of(windowA, WindowedValue.of( - KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)), - KV.of(windowA, WindowedValue.of( - KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)), - KV.of(windowB, WindowedValue.of( - KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)), - KV.of(windowB, WindowedValue.of( - KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))), - KV.of(2, - (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of( - KV.of(windowC, WindowedValue.of( - KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING))))); - - // The order of the output elements is important relative to processing order - List<IsmRecord<WindowedValue<TransformedMap<Long, - WindowedValue<Long>, - Long>>>> output = - doFnTester.processBundle(inputElements); - assertEquals(3, output.size()); - Map<Long, Long> outputMap; - - outputMap = output.get(0).getValue().getValue(); - assertEquals(2, outputMap.size()); - assertEquals(ImmutableMap.of(1L, 11L, 2L, 21L), outputMap); - - outputMap = output.get(1).getValue().getValue(); - assertEquals(2, outputMap.size()); - assertEquals(ImmutableMap.of(2L, 21L, 3L, 31L), outputMap); - - outputMap = output.get(2).getValue().getValue(); - assertEquals(1, outputMap.size()); - assertEquals(ImmutableMap.of(4L, 41L), outputMap); - } - - @Test - public void testToMultimapDoFn() throws Exception { - Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); - - DoFnTester<KV<Integer, Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>, - IsmRecord<WindowedValue<TransformedMap<Long, - Iterable<WindowedValue<Long>>, - Iterable<Long>>>>> doFnTester = - DoFnTester.of( - new BatchViewAsMultimap.ToMultimapDoFn<Long, Long, IntervalWindow>(windowCoder)); - - - IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); - IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); - IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); - - Iterable<KV<Integer, - Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>>> inputElements = - ImmutableList.of( - KV.of(1, - (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of( - KV.of(windowA, WindowedValue.of( - KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)), - KV.of(windowA, WindowedValue.of( - KV.of(1L, 12L), new Instant(5), windowA, PaneInfo.NO_FIRING)), - KV.of(windowA, WindowedValue.of( - KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)), - KV.of(windowB, WindowedValue.of( - KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)), - KV.of(windowB, WindowedValue.of( - KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))), - KV.of(2, - (Iterable<KV<IntervalWindow, WindowedValue<KV<Long, Long>>>>) ImmutableList.of( - KV.of(windowC, WindowedValue.of( - KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING))))); - - // The order of the output elements is important relative to processing order - List<IsmRecord<WindowedValue<TransformedMap<Long, - Iterable<WindowedValue<Long>>, - Iterable<Long>>>>> output = - doFnTester.processBundle(inputElements); - assertEquals(3, output.size()); - Map<Long, Iterable<Long>> outputMap; - - outputMap = output.get(0).getValue().getValue(); - assertEquals(2, outputMap.size()); - assertThat(outputMap.get(1L), containsInAnyOrder(11L, 12L)); - assertThat(outputMap.get(2L), containsInAnyOrder(21L)); - - outputMap = output.get(1).getValue().getValue(); - assertEquals(2, outputMap.size()); - assertThat(outputMap.get(2L), containsInAnyOrder(21L)); - assertThat(outputMap.get(3L), containsInAnyOrder(31L)); - - outputMap = output.get(2).getValue().getValue(); - assertEquals(1, outputMap.size()); - assertThat(outputMap.get(4L), containsInAnyOrder(41L)); - } - /** * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally * when the runner issuccessfully run.
