gianm commented on code in PR #13506:
URL: https://github.com/apache/druid/pull/13506#discussion_r1114229933


##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessorTest.java:
##########
@@ -0,0 +1,987 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.common;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.ReadableNilFrameChannel;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.test.LimitedFrameWriterFactory;
+import org.apache.druid.segment.RowBasedSegment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.join.JoinTestHelper;
+import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(Parameterized.class)
+public class SortMergeJoinFrameProcessorTest extends 
InitializedNullHandlingTest
+{
+  private static final StagePartition STAGE_PARTITION = new StagePartition(new 
StageId("q", 0), 0);
+
+  private final int rowsPerInputFrame;
+  private final int rowsPerOutputFrame;
+
+  private FrameProcessorExecutor exec;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  public SortMergeJoinFrameProcessorTest(int rowsPerInputFrame, int 
rowsPerOutputFrame)
+  {
+    this.rowsPerInputFrame = rowsPerInputFrame;
+    this.rowsPerOutputFrame = rowsPerOutputFrame;
+  }
+
+  @Parameterized.Parameters(name = "rowsPerInputFrame = {0}, 
rowsPerOutputFrame = {1}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    final List<Object[]> constructors = new ArrayList<>();
+
+    for (final int rowsPerInputFrame : new int[]{1, 2, 7, Integer.MAX_VALUE}) {
+      for (final int rowsPerOutputFrame : new int[]{1, 2, 7, 
Integer.MAX_VALUE}) {
+        constructors.add(new Object[]{rowsPerInputFrame, rowsPerOutputFrame});
+      }
+    }
+
+    return constructors;
+  }
+
+  @Before
+  public void setUp()
+  {
+    exec = new 
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    exec.getExecutorService().shutdownNow();
+    exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
+  }
+
+  @Test
+  public void testLeftJoinEmptyLeftSide() throws Exception
+  {
+    final ReadableInput factChannel = ReadableInput.channel(
+        ReadableNilFrameChannel.INSTANCE,
+        FrameReader.create(JoinTestHelper.FACT_SIGNATURE),
+        STAGE_PARTITION
+    );
+
+    final ReadableInput countriesChannel =
+        buildCountriesInput(ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING)));
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("page", ColumnType.STRING)
+                    .add("countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryName", ColumnType.STRING)
+                    .add("j0.countryNumber", ColumnType.LONG)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        factChannel,
+        countriesChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(
+            ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING)),
+            ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING))
+        ),
+        JoinType.LEFT
+    );
+
+    assertResult(processor, outputChannel.readable(), joinSignature, 
Collections.emptyList());
+  }
+
+  @Test
+  public void testLeftJoinEmptyRightSide() throws Exception
+  {
+    final ReadableInput factChannel = buildFactInput(
+        ImmutableList.of(
+            new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
+            new KeyColumn("page", KeyOrder.ASCENDING)
+        )
+    );
+
+
+    final ReadableInput countriesChannel = ReadableInput.channel(
+        ReadableNilFrameChannel.INSTANCE,
+        FrameReader.create(JoinTestHelper.COUNTRIES_SIGNATURE),
+        STAGE_PARTITION
+    );
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("page", ColumnType.STRING)
+                    .add("countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryName", ColumnType.STRING)
+                    .add("j0.countryNumber", ColumnType.LONG)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        factChannel,
+        countriesChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(
+            ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING)),
+            ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING))
+        ),
+        JoinType.LEFT
+    );
+
+    final List<List<Object>> expectedRows = Arrays.asList(
+        Arrays.asList("Agama mossambica", null, null, null, null),
+        Arrays.asList("Apamea abruzzorum", null, null, null, null),
+        Arrays.asList("Atractus flammigerus", null, null, null, null),
+        Arrays.asList("Rallicula", null, null, null, null),
+        Arrays.asList("Talk:Oswald Tilghman", null, null, null, null),
+        Arrays.asList("Peremptory norm", "AU", null, null, null),
+        Arrays.asList("Didier Leclair", "CA", null, null, null),
+        Arrays.asList("Les Argonautes", "CA", null, null, null),
+        Arrays.asList("Sarah Michelle Gellar", "CA", null, null, null),
+        Arrays.asList("Golpe de Estado en Chile de 1973", "CL", null, null, 
null),
+        Arrays.asList("Diskussion:Sebastian Schulz", "DE", null, null, null),
+        Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", null, 
null, null),
+        Arrays.asList("Saison 9 de Secret Story", "FR", null, null, null),
+        Arrays.asList("Glasgow", "GB", null, null, null),
+        Arrays.asList("Giusy Ferreri discography", "IT", null, null, null),
+        Arrays.asList("Roma-Bangkok", "IT", null, null, null),
+        Arrays.asList("青野武", "JP", null, null, null),
+        Arrays.asList("유희왕 GX", "KR", null, null, null),
+        Arrays.asList("History of Fourems", "MMMM", null, null, null),
+        Arrays.asList("Mathis Bolly", "MX", null, null, null),
+        Arrays.asList("Orange Soda", "MatchNothing", null, null, null),
+        Arrays.asList("Алиса в Зазеркалье", "NO", null, null, null),
+        Arrays.asList("Cream Soda", "SU", null, null, null),
+        Arrays.asList("Wendigo", "SV", null, null, null),
+        Arrays.asList("Carlo Curti", "US", null, null, null),
+        Arrays.asList("DirecTV", "US", null, null, null),
+        Arrays.asList("Old Anatolian Turkish", "US", null, null, null),
+        Arrays.asList("Otjiwarongo Airport", "US", null, null, null),
+        Arrays.asList("President of India", "US", null, null, null)
+    );
+
+    assertResult(processor, outputChannel.readable(), joinSignature, 
expectedRows);
+  }
+
+  @Test
+  public void testInnerJoinEmptyRightSide() throws Exception
+  {
+    final ReadableInput factChannel = buildFactInput(
+        ImmutableList.of(
+            new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
+            new KeyColumn("page", KeyOrder.ASCENDING)
+        )
+    );
+
+    final ReadableInput countriesChannel = ReadableInput.channel(
+        ReadableNilFrameChannel.INSTANCE,
+        FrameReader.create(JoinTestHelper.COUNTRIES_SIGNATURE),
+        STAGE_PARTITION
+    );
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("page", ColumnType.STRING)
+                    .add("countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryName", ColumnType.STRING)
+                    .add("j0.countryNumber", ColumnType.LONG)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        factChannel,
+        countriesChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(
+            ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING)),
+            ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING))
+        ),
+        JoinType.INNER
+    );
+
+    assertResult(processor, outputChannel.readable(), joinSignature, 
Collections.emptyList());
+  }
+
+  @Test
+  public void testLeftJoinCountryIsoCode() throws Exception
+  {
+    final ReadableInput factChannel = buildFactInput(
+        ImmutableList.of(
+            new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
+            new KeyColumn("page", KeyOrder.ASCENDING)
+        )
+    );
+
+    final ReadableInput countriesChannel =
+        buildCountriesInput(ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING)));
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("page", ColumnType.STRING)
+                    .add("countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryName", ColumnType.STRING)
+                    .add("j0.countryNumber", ColumnType.LONG)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        factChannel,
+        countriesChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(
+            ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING)),
+            ImmutableList.of(new KeyColumn("countryIsoCode", 
KeyOrder.ASCENDING))
+        ),
+        JoinType.LEFT
+    );
+
+    final List<List<Object>> expectedRows = Arrays.asList(
+        Arrays.asList("Agama mossambica", null, null, null, null),
+        Arrays.asList("Apamea abruzzorum", null, null, null, null),
+        Arrays.asList("Atractus flammigerus", null, null, null, null),
+        Arrays.asList("Rallicula", null, null, null, null),
+        Arrays.asList("Talk:Oswald Tilghman", null, null, null, null),
+        Arrays.asList("Peremptory norm", "AU", "AU", "Australia", 0L),
+        Arrays.asList("Didier Leclair", "CA", "CA", "Canada", 1L),
+        Arrays.asList("Les Argonautes", "CA", "CA", "Canada", 1L),
+        Arrays.asList("Sarah Michelle Gellar", "CA", "CA", "Canada", 1L),
+        Arrays.asList("Golpe de Estado en Chile de 1973", "CL", "CL", "Chile", 
2L),
+        Arrays.asList("Diskussion:Sebastian Schulz", "DE", "DE", "Germany", 
3L),
+        Arrays.asList("Gabinete Ministerial de Rafael Correa", "EC", "EC", 
"Ecuador", 4L),
+        Arrays.asList("Saison 9 de Secret Story", "FR", "FR", "France", 5L),
+        Arrays.asList("Glasgow", "GB", "GB", "United Kingdom", 6L),
+        Arrays.asList("Giusy Ferreri discography", "IT", "IT", "Italy", 7L),
+        Arrays.asList("Roma-Bangkok", "IT", "IT", "Italy", 7L),
+        Arrays.asList("青野武", "JP", "JP", "Japan", 8L),
+        Arrays.asList("유희왕 GX", "KR", "KR", "Republic of Korea", 9L),
+        Arrays.asList("History of Fourems", "MMMM", "MMMM", "Fourems", 205L),
+        Arrays.asList("Mathis Bolly", "MX", "MX", "Mexico", 10L),
+        Arrays.asList("Orange Soda", "MatchNothing", null, null, null),
+        Arrays.asList("Алиса в Зазеркалье", "NO", "NO", "Norway", 11L),
+        Arrays.asList("Cream Soda", "SU", "SU", "States United", 15L),
+        Arrays.asList("Wendigo", "SV", "SV", "El Salvador", 12L),
+        Arrays.asList("Carlo Curti", "US", "US", "United States", 13L),
+        Arrays.asList("DirecTV", "US", "US", "United States", 13L),
+        Arrays.asList("Old Anatolian Turkish", "US", "US", "United States", 
13L),
+        Arrays.asList("Otjiwarongo Airport", "US", "US", "United States", 13L),
+        Arrays.asList("President of India", "US", "US", "United States", 13L)
+    );
+
+    assertResult(processor, outputChannel.readable(), joinSignature, 
expectedRows);
+  }
+
+  @Test
+  public void testCrossJoin() throws Exception
+  {
+    final ReadableInput factChannel = buildFactInput(
+        ImmutableList.of(
+            new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
+            new KeyColumn("page", KeyOrder.ASCENDING)
+        )
+    );
+
+    final ReadableInput countriesChannel = makeChannelFromResourceWithLimit(
+        JoinTestHelper.COUNTRIES_RESOURCE,
+        JoinTestHelper.COUNTRIES_SIGNATURE,
+        ImmutableList.of(new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)),
+        2
+    );
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("j0.page", ColumnType.STRING)
+                    .add("countryIsoCode", ColumnType.STRING)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        countriesChannel,
+        factChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(Collections.emptyList(), Collections.emptyList()),
+        JoinType.INNER
+    );
+
+    final List<List<Object>> expectedRows = Arrays.asList(
+        Arrays.asList("Agama mossambica", "AU"),
+        Arrays.asList("Agama mossambica", "CA"),
+        Arrays.asList("Apamea abruzzorum", "AU"),
+        Arrays.asList("Apamea abruzzorum", "CA"),
+        Arrays.asList("Atractus flammigerus", "AU"),
+        Arrays.asList("Atractus flammigerus", "CA"),
+        Arrays.asList("Rallicula", "AU"),
+        Arrays.asList("Rallicula", "CA"),
+        Arrays.asList("Talk:Oswald Tilghman", "AU"),
+        Arrays.asList("Talk:Oswald Tilghman", "CA"),
+        Arrays.asList("Peremptory norm", "AU"),
+        Arrays.asList("Peremptory norm", "CA"),
+        Arrays.asList("Didier Leclair", "AU"),
+        Arrays.asList("Didier Leclair", "CA"),
+        Arrays.asList("Les Argonautes", "AU"),
+        Arrays.asList("Les Argonautes", "CA"),
+        Arrays.asList("Sarah Michelle Gellar", "AU"),
+        Arrays.asList("Sarah Michelle Gellar", "CA"),
+        Arrays.asList("Golpe de Estado en Chile de 1973", "AU"),
+        Arrays.asList("Golpe de Estado en Chile de 1973", "CA"),
+        Arrays.asList("Diskussion:Sebastian Schulz", "AU"),
+        Arrays.asList("Diskussion:Sebastian Schulz", "CA"),
+        Arrays.asList("Gabinete Ministerial de Rafael Correa", "AU"),
+        Arrays.asList("Gabinete Ministerial de Rafael Correa", "CA"),
+        Arrays.asList("Saison 9 de Secret Story", "AU"),
+        Arrays.asList("Saison 9 de Secret Story", "CA"),
+        Arrays.asList("Glasgow", "AU"),
+        Arrays.asList("Glasgow", "CA"),
+        Arrays.asList("Giusy Ferreri discography", "AU"),
+        Arrays.asList("Giusy Ferreri discography", "CA"),
+        Arrays.asList("Roma-Bangkok", "AU"),
+        Arrays.asList("Roma-Bangkok", "CA"),
+        Arrays.asList("青野武", "AU"),
+        Arrays.asList("青野武", "CA"),
+        Arrays.asList("유희왕 GX", "AU"),
+        Arrays.asList("유희왕 GX", "CA"),
+        Arrays.asList("History of Fourems", "AU"),
+        Arrays.asList("History of Fourems", "CA"),
+        Arrays.asList("Mathis Bolly", "AU"),
+        Arrays.asList("Mathis Bolly", "CA"),
+        Arrays.asList("Orange Soda", "AU"),
+        Arrays.asList("Orange Soda", "CA"),
+        Arrays.asList("Алиса в Зазеркалье", "AU"),
+        Arrays.asList("Алиса в Зазеркалье", "CA"),
+        Arrays.asList("Cream Soda", "AU"),
+        Arrays.asList("Cream Soda", "CA"),
+        Arrays.asList("Wendigo", "AU"),
+        Arrays.asList("Wendigo", "CA"),
+        Arrays.asList("Carlo Curti", "AU"),
+        Arrays.asList("Carlo Curti", "CA"),
+        Arrays.asList("DirecTV", "AU"),
+        Arrays.asList("DirecTV", "CA"),
+        Arrays.asList("Old Anatolian Turkish", "AU"),
+        Arrays.asList("Old Anatolian Turkish", "CA"),
+        Arrays.asList("Otjiwarongo Airport", "AU"),
+        Arrays.asList("Otjiwarongo Airport", "CA"),
+        Arrays.asList("President of India", "AU"),
+        Arrays.asList("President of India", "CA")
+    );
+
+    assertResult(processor, outputChannel.readable(), joinSignature, 
expectedRows);
+  }
+
+  @Test
+  public void testLeftJoinRegions() throws Exception
+  {
+    final ReadableInput factChannel =
+        buildFactInput(
+            ImmutableList.of(
+                new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("page", KeyOrder.ASCENDING)
+            )
+        );
+
+    final ReadableInput regionsChannel =
+        buildRegionsInput(
+            ImmutableList.of(
+                new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)
+            )
+        );
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("page", ColumnType.STRING)
+                    .add("j0.regionName", ColumnType.STRING)
+                    .add("countryIsoCode", ColumnType.STRING)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        factChannel,
+        regionsChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(
+            ImmutableList.of(
+                new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)
+            ),
+            ImmutableList.of(
+                new KeyColumn("countryIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("regionIsoCode", KeyOrder.ASCENDING)
+            )
+        ),
+        JoinType.LEFT
+    );
+
+    final List<List<Object>> expectedRows = Arrays.asList(
+        Arrays.asList("Agama mossambica", null, null),
+        Arrays.asList("Apamea abruzzorum", null, null),
+        Arrays.asList("Atractus flammigerus", null, null),
+        Arrays.asList("Rallicula", null, null),
+        Arrays.asList("Talk:Oswald Tilghman", null, null),
+        Arrays.asList("Peremptory norm", "New South Wales", "AU"),
+        Arrays.asList("Didier Leclair", "Ontario", "CA"),
+        Arrays.asList("Sarah Michelle Gellar", "Ontario", "CA"),
+        Arrays.asList("Les Argonautes", "Quebec", "CA"),
+        Arrays.asList("Golpe de Estado en Chile de 1973", "Santiago 
Metropolitan", "CL"),
+        Arrays.asList("Diskussion:Sebastian Schulz", "Hesse", "DE"),
+        Arrays.asList("Gabinete Ministerial de Rafael Correa", "Provincia del 
Guayas", "EC"),
+        Arrays.asList("Saison 9 de Secret Story", "Val d'Oise", "FR"),
+        Arrays.asList("Glasgow", "Kingston upon Hull", "GB"),
+        Arrays.asList("Giusy Ferreri discography", "Provincia di Varese", 
"IT"),
+        Arrays.asList("Roma-Bangkok", "Provincia di Varese", "IT"),
+        Arrays.asList("青野武", "Tōkyō", "JP"),
+        Arrays.asList("유희왕 GX", "Seoul", "KR"),
+        Arrays.asList("History of Fourems", "Fourems Province", "MMMM"),
+        Arrays.asList("Mathis Bolly", "Mexico City", "MX"),
+        Arrays.asList("Orange Soda", null, "MatchNothing"),
+        Arrays.asList("Алиса в Зазеркалье", "Finnmark Fylke", "NO"),
+        Arrays.asList("Cream Soda", "Ainigriv", "SU"),
+        Arrays.asList("Wendigo", "Departamento de San Salvador", "SV"),
+        Arrays.asList("Carlo Curti", "California", "US"),
+        Arrays.asList("Otjiwarongo Airport", "California", "US"),
+        Arrays.asList("President of India", "California", "US"),
+        Arrays.asList("DirecTV", "North Carolina", "US"),
+        Arrays.asList("Old Anatolian Turkish", "Virginia", "US")
+    );
+
+    assertResult(processor, outputChannel.readable(), joinSignature, 
expectedRows);
+  }
+
+  @Test
+  public void testRightJoinRegionCodeOnly() throws Exception
+  {
+    // This join generates duplicates.
+
+    final ReadableInput factChannel =
+        buildFactInput(
+            ImmutableList.of(
+                new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("page", KeyOrder.ASCENDING)
+            )
+        );
+
+    final ReadableInput regionsChannel =
+        buildRegionsInput(
+            ImmutableList.of(
+                new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)
+            )
+        );
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("j0.page", ColumnType.STRING)
+                    .add("regionName", ColumnType.STRING)
+                    .add("j0.countryIsoCode", ColumnType.STRING)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        regionsChannel,
+        factChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(
+            ImmutableList.of(new KeyColumn("regionIsoCode", 
KeyOrder.ASCENDING)),
+            ImmutableList.of(new KeyColumn("regionIsoCode", 
KeyOrder.ASCENDING))
+        ),
+        JoinType.RIGHT
+    );
+
+    final List<List<Object>> expectedRows = Arrays.asList(
+        Arrays.asList("Agama mossambica", null, null),
+        Arrays.asList("Apamea abruzzorum", null, null),
+        Arrays.asList("Atractus flammigerus", null, null),
+        Arrays.asList("Rallicula", null, null),
+        Arrays.asList("Talk:Oswald Tilghman", null, null),
+        Arrays.asList("유희왕 GX", "Seoul", "KR"),
+        Arrays.asList("青野武", "Tōkyō", "JP"),
+        Arrays.asList("Алиса в Зазеркалье", "Finnmark Fylke", "NO"),
+        Arrays.asList("Saison 9 de Secret Story", "Val d'Oise", "FR"),
+        Arrays.asList("Cream Soda", "Ainigriv", "SU"),
+        Arrays.asList("Carlo Curti", "California", "US"),
+        Arrays.asList("Otjiwarongo Airport", "California", "US"),
+        Arrays.asList("President of India", "California", "US"),
+        Arrays.asList("Mathis Bolly", "Mexico City", "MX"),
+        Arrays.asList("Gabinete Ministerial de Rafael Correa", "Provincia del 
Guayas", "EC"),
+        Arrays.asList("Diskussion:Sebastian Schulz", "Hesse", "DE"),
+        Arrays.asList("Glasgow", "Kingston upon Hull", "GB"),
+        Arrays.asList("History of Fourems", "Fourems Province", "MMMM"),
+        Arrays.asList("Orange Soda", null, "MatchNothing"),
+        Arrays.asList("DirecTV", "North Carolina", "US"),
+        Arrays.asList("Peremptory norm", "New South Wales", "AU"),
+        Arrays.asList("Didier Leclair", "Ontario", "CA"),
+        Arrays.asList("Sarah Michelle Gellar", "Ontario", "CA"),
+        Arrays.asList("Les Argonautes", "Quebec", "CA"),
+        Arrays.asList("Golpe de Estado en Chile de 1973", "Santiago 
Metropolitan", "CL"),
+        Arrays.asList("Wendigo", "Departamento de San Salvador", "SV"),
+        Arrays.asList("Giusy Ferreri discography", "Provincia di Varese", 
"IT"),
+        Arrays.asList("Giusy Ferreri discography", "Virginia", "IT"),
+        Arrays.asList("Old Anatolian Turkish", "Provincia di Varese", "US"),
+        Arrays.asList("Old Anatolian Turkish", "Virginia", "US"),
+        Arrays.asList("Roma-Bangkok", "Provincia di Varese", "IT"),
+        Arrays.asList("Roma-Bangkok", "Virginia", "IT")
+    );
+
+    assertResult(processor, outputChannel.readable(), joinSignature, 
expectedRows);
+  }
+
+  @Test
+  public void testFullOuterJoinRegionCodeOnly() throws Exception
+  {
+    // This join generates duplicates.
+
+    final ReadableInput factChannel =
+        buildFactInput(
+            ImmutableList.of(
+                new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("page", KeyOrder.ASCENDING)
+            )
+        );
+
+    final ReadableInput regionsChannel =
+        buildRegionsInput(
+            ImmutableList.of(
+                new KeyColumn("regionIsoCode", KeyOrder.ASCENDING),
+                new KeyColumn("countryIsoCode", KeyOrder.ASCENDING)
+            )
+        );
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("j0.page", ColumnType.STRING)
+                    .add("regionName", ColumnType.STRING)
+                    .add("j0.countryIsoCode", ColumnType.STRING)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        regionsChannel,
+        factChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(
+            ImmutableList.of(new KeyColumn("regionIsoCode", 
KeyOrder.ASCENDING)),
+            ImmutableList.of(new KeyColumn("regionIsoCode", 
KeyOrder.ASCENDING))
+        ),
+        JoinType.FULL
+    );
+
+    final List<List<Object>> expectedRows = Arrays.asList(
+        Arrays.asList(null, "Nulland", null),
+        Arrays.asList("Agama mossambica", null, null),
+        Arrays.asList("Apamea abruzzorum", null, null),
+        Arrays.asList("Atractus flammigerus", null, null),
+        Arrays.asList("Rallicula", null, null),
+        Arrays.asList("Talk:Oswald Tilghman", null, null),
+        Arrays.asList("유희왕 GX", "Seoul", "KR"),
+        Arrays.asList("青野武", "Tōkyō", "JP"),
+        Arrays.asList("Алиса в Зазеркалье", "Finnmark Fylke", "NO"),
+        Arrays.asList("Saison 9 de Secret Story", "Val d'Oise", "FR"),
+        Arrays.asList(null, "Foureis Province", null),
+        Arrays.asList("Cream Soda", "Ainigriv", "SU"),
+        Arrays.asList("Carlo Curti", "California", "US"),
+        Arrays.asList("Otjiwarongo Airport", "California", "US"),
+        Arrays.asList("President of India", "California", "US"),
+        Arrays.asList("Mathis Bolly", "Mexico City", "MX"),
+        Arrays.asList("Gabinete Ministerial de Rafael Correa", "Provincia del 
Guayas", "EC"),
+        Arrays.asList("Diskussion:Sebastian Schulz", "Hesse", "DE"),
+        Arrays.asList("Glasgow", "Kingston upon Hull", "GB"),
+        Arrays.asList("History of Fourems", "Fourems Province", "MMMM"),
+        Arrays.asList("Orange Soda", null, "MatchNothing"),
+        Arrays.asList("DirecTV", "North Carolina", "US"),
+        Arrays.asList("Peremptory norm", "New South Wales", "AU"),
+        Arrays.asList("Didier Leclair", "Ontario", "CA"),
+        Arrays.asList("Sarah Michelle Gellar", "Ontario", "CA"),
+        Arrays.asList("Les Argonautes", "Quebec", "CA"),
+        Arrays.asList("Golpe de Estado en Chile de 1973", "Santiago 
Metropolitan", "CL"),
+        Arrays.asList("Wendigo", "Departamento de San Salvador", "SV"),
+        Arrays.asList("Giusy Ferreri discography", "Provincia di Varese", 
"IT"),
+        Arrays.asList("Giusy Ferreri discography", "Virginia", "IT"),
+        Arrays.asList("Old Anatolian Turkish", "Provincia di Varese", "US"),
+        Arrays.asList("Old Anatolian Turkish", "Virginia", "US"),
+        Arrays.asList("Roma-Bangkok", "Provincia di Varese", "IT"),
+        Arrays.asList("Roma-Bangkok", "Virginia", "IT"),
+        Arrays.asList(null, "Usca City", null)
+    );
+
+    assertResult(processor, outputChannel.readable(), joinSignature, 
expectedRows);
+  }
+
+  @Test
+  public void testLeftJoinCountryNumber() throws Exception
+  {
+    final ReadableInput factChannel = buildFactInput(
+        ImmutableList.of(
+            new KeyColumn("countryNumber", KeyOrder.ASCENDING),
+            new KeyColumn("page", KeyOrder.ASCENDING)
+        )
+    );
+
+    final ReadableInput countriesChannel =
+        buildCountriesInput(ImmutableList.of(new KeyColumn("countryNumber", 
KeyOrder.ASCENDING)));
+
+    final BlockingQueueFrameChannel outputChannel = 
BlockingQueueFrameChannel.minimal();
+
+    final RowSignature joinSignature =
+        RowSignature.builder()
+                    .add("page", ColumnType.STRING)
+                    .add("countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryIsoCode", ColumnType.STRING)
+                    .add("j0.countryName", ColumnType.STRING)
+                    .add("j0.countryNumber", ColumnType.LONG)
+                    .build();
+
+    final SortMergeJoinFrameProcessor processor = new 
SortMergeJoinFrameProcessor(
+        factChannel,
+        countriesChannel,
+        outputChannel.writable(),
+        makeFrameWriterFactory(joinSignature),
+        "j0.",
+        ImmutableList.of(
+            ImmutableList.of(new KeyColumn("countryNumber", 
KeyOrder.ASCENDING)),
+            ImmutableList.of(new KeyColumn("countryNumber", 
KeyOrder.ASCENDING))
+        ),
+        JoinType.LEFT
+    );
+
+    final String countryCodeForNull;
+    final String countryNameForNull;
+    final Long countryNumberForNull;
+
+    if (NullHandling.sqlCompatible()) {
+      countryCodeForNull = null;
+      countryNameForNull = null;
+      countryNumberForNull = null;
+    } else {
+      // In default-value mode, null country number from the left-hand table 
converts to zero, which matches Australia.
+      countryCodeForNull = "AU";
+      countryNameForNull = "Australia";
+      countryNumberForNull = 0L;

Review Comment:
   Sure. Although, it's accurate as-is, because the null from the left-hand 
side really does convert to zero, since the left-hand side isn't using nested 
columns. Any suggestions on how to rewrite the comment to be better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to