This is an automated email from the ASF dual-hosted git repository.

adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fb63520de91 Add tests for ProcessorManager (#16327)
fb63520de91 is described below

commit fb63520de917f81a597257d22ae9c95457503148
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Apr 30 09:35:26 2024 +0530

    Add tests for ProcessorManager (#16327)
    
    * Add tests for ProcessorManager
---
 .../msq/querykit/ChainedProcessorManager.java      |   2 +-
 .../msq/querykit/ChainedProcessorManagerTest.java  | 337 +++++++++++++++++++++
 .../querykit/NonFailingWritableFrameChannel.java   |  66 ++++
 .../processor/FrameProcessorExecutorTest.java      |   2 +-
 .../manager/SequenceProcessorManagerTest.java      |   4 +-
 .../test/SimpleReturningFrameProcessor.java        |  45 +++
 .../test/SingleChannelFrameProcessor.java          |  84 +++++
 .../test/SingleRowWritingFrameProcessor.java       |  46 +++
 .../processor/test/TestFrameProcessorUtils.java    |  84 +++++
 9 files changed, 666 insertions(+), 4 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java
index 687f9692cb3..22be0f3e64d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java
@@ -111,7 +111,7 @@ public class ChainedProcessorManager<A, B, R> implements 
ProcessorManager<Object
     );
   }
 
-  private synchronized void checkFirstProcessorComplete()
+  private void checkFirstProcessorComplete()
   {
     if (first == null && (firstProcessorResult.size() == 
firstProcessorCount.get())) {
       restFuture.set(restFactory.apply(firstProcessorResult));
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/ChainedProcessorManagerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/ChainedProcessorManagerTest.java
new file mode 100644
index 00000000000..b8405b5f293
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/ChainedProcessorManagerTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.Bouncer;
+import org.apache.druid.frame.processor.FrameProcessorExecutorTest;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.manager.ProcessorManager;
+import org.apache.druid.frame.processor.manager.ProcessorManagers;
+import org.apache.druid.frame.processor.manager.SequenceProcessorManagerTest;
+import org.apache.druid.frame.processor.test.SimpleReturningFrameProcessor;
+import org.apache.druid.frame.processor.test.SingleChannelFrameProcessor;
+import org.apache.druid.frame.processor.test.SingleRowWritingFrameProcessor;
+import org.apache.druid.frame.processor.test.TestFrameProcessorUtils;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameCursor;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+/**
+ * Unit tests for {@link ChainedProcessorManager}.
+ */
+@RunWith(Parameterized.class)
+public class ChainedProcessorManagerTest extends 
FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite
+{
+  private final Bouncer bouncer;
+  private final int maxOutstandingProcessors;
+
+  private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+                                                               .addTimeColumn()
+                                                               .add("col", 
ColumnType.LONG)
+                                                               .build();
+
+  public ChainedProcessorManagerTest(int numThreads, int bouncerPoolSize, int 
maxOutstandingProcessors)
+  {
+    super(numThreads);
+    this.bouncer = bouncerPoolSize == Integer.MAX_VALUE ? Bouncer.unlimited() 
: new Bouncer(bouncerPoolSize);
+    this.maxOutstandingProcessors = maxOutstandingProcessors;
+  }
+
+  @Parameterized.Parameters(name = "numThreads = {0}, bouncerPoolSize = {1}, 
maxOutstandingProcessors = {2}")
+  public static Collection<Object[]> constructorFeeder()
+  {
+    final List<Object[]> constructors = new ArrayList<>();
+
+    for (int numThreads : new int[]{1, 3, 12}) {
+      for (int bouncerPoolSize : new int[]{1, 3, 12, Integer.MAX_VALUE}) {
+        for (int maxOutstandingProcessors : new int[]{1, 3, 12}) {
+          constructors.add(new Object[]{numThreads, bouncerPoolSize, 
maxOutstandingProcessors});
+        }
+      }
+    }
+
+    return constructors;
+  }
+
+  /**
+   * Simple functional test of the chained processor manager.
+   * The {@link ChainedProcessorManager#first} is a {@link 
SimpleReturningFrameProcessor} which returns the list of
+   * values. The {@link ChainedProcessorManager#restFactory} is a function 
that creates a
+   * {@link SingleRowWritingFrameProcessor} for each value.
+   */
+  @Test
+  public void test_simple_chained_processor() throws ExecutionException, 
InterruptedException
+  {
+    final ImmutableList<Long> expectedValues = ImmutableList.of(1L, 2L, 3L);
+    final BlockingQueueFrameChannel outputChannel = new 
BlockingQueueFrameChannel(3);
+    final SimpleReturningFrameProcessor<List<Long>> firstProcessor = new 
SimpleReturningFrameProcessor<>(expectedValues);
+
+    final ChainedProcessorManager<List<Long>, Long, Long> 
chainedProcessorManager = new ChainedProcessorManager<>(
+        ProcessorManagers.of(() -> firstProcessor),
+        (values) -> createNextProcessors(outputChannel.writable(), 
values.stream().flatMap(List::stream).collect(Collectors.toList()))
+    );
+
+    exec.runAllFully(
+        chainedProcessorManager,
+        maxOutstandingProcessors,
+        bouncer,
+        null
+    ).get();
+
+    final HashSet<Long> actualValues = new HashSet<>();
+    try (ReadableFrameChannel readable = outputChannel.readable()) {
+      while (readable.canRead()) {
+        actualValues.add(extractColumnValue(readable.read(), 1));
+      }
+    }
+    Assert.assertEquals(new HashSet<>(expectedValues), actualValues);
+  }
+
+  /**
+   * Test with multiple processors in {@link ChainedProcessorManager#first}.
+   */
+  @Test
+  public void test_multiple_processor_manager() throws ExecutionException, 
InterruptedException
+  {
+    final ImmutableList<Long> valueSet1 = ImmutableList.of(1L, 2L, 3L);
+    final ImmutableList<Long> valueSet2 = ImmutableList.of(4L, 5L, 6L);
+    final BlockingQueueFrameChannel outputChannel = new 
BlockingQueueFrameChannel(20);
+
+    final ChainedProcessorManager<List<Long>, Long, Long> 
chainedProcessorManager = new ChainedProcessorManager<>(
+        ProcessorManagers.of(
+            ImmutableList.of(
+                new SimpleReturningFrameProcessor<>(valueSet1),
+                new SimpleReturningFrameProcessor<>(valueSet2)
+            )
+        ),
+        (values) -> createNextProcessors(outputChannel.writable(), 
values.stream().flatMap(List::stream).collect(Collectors.toList()))
+    );
+
+    exec.runAllFully(
+        chainedProcessorManager,
+        maxOutstandingProcessors,
+        bouncer,
+        null
+    ).get();
+
+    final Set<Long> expectedValues = new HashSet<>();
+    expectedValues.addAll(valueSet1);
+    expectedValues.addAll(valueSet2);
+    final HashSet<Long> actualValues = new HashSet<>();
+    try (ReadableFrameChannel readable = outputChannel.readable()) {
+      while (readable.canRead()) {
+        actualValues.add(extractColumnValue(readable.read(), 1));
+      }
+    }
+    Assert.assertEquals(expectedValues, actualValues);
+  }
+
+  @Test
+  public void test_failing_processor_manager()
+  {
+    final ImmutableSet<Long> expectedValues = ImmutableSet.of();
+    final BlockingQueueFrameChannel outputChannel = new 
BlockingQueueFrameChannel(20);
+
+    final ChainedProcessorManager<List<Long>, Long, Long> 
chainedProcessorManager = new ChainedProcessorManager<>(
+        ProcessorManagers.of(
+            ImmutableList.of(
+                new SimpleReturningFrameProcessor<>(ImmutableList.of(4L, 5L, 
6L)),
+                new SequenceProcessorManagerTest.NilFrameProcessor<>()
+            )
+        ),
+        (values) -> createNextProcessors(
+            new NonFailingWritableFrameChannel(outputChannel.writable()),
+            values.stream().flatMap(List::stream).collect(Collectors.toList())
+        )
+    );
+
+    final ListenableFuture<Long> future = exec.runAllFully(
+        chainedProcessorManager,
+        maxOutstandingProcessors,
+        bouncer,
+        null
+    );
+
+    Assert.assertThrows(ExecutionException.class, future::get);
+
+    final HashSet<Long> actualValues = new HashSet<>();
+    try (ReadableFrameChannel readable = outputChannel.readable()) {
+      while (readable.canRead()) {
+        actualValues.add(extractColumnValue(readable.read(), 1));
+      }
+    }
+    Assert.assertEquals(expectedValues, actualValues);
+  }
+
+  @Test
+  public void test_chaining_processor_manager()
+  {
+    final Set<Long> values = LongStream.range(1, 
10).boxed().collect(Collectors.toSet());
+    runChainedExceptionTest(values, values, null);
+  }
+
+  @Test
+  public void test_chaining_processor_manager_with_exception()
+  {
+    final Set<Long> values = LongStream.range(1, 
10).boxed().collect(Collectors.toSet());
+
+    for (long failingValue = 1; failingValue < 10; failingValue++) {
+      final Set<Long> expectedValues = LongStream.range(1, 
failingValue).boxed().collect(Collectors.toSet());
+      runChainedExceptionTest(values, expectedValues, failingValue);
+    }
+  }
+
+  public void runChainedExceptionTest(Set<Long> values, Set<Long> 
expectedValues, Long failingValue)
+  {
+    final BlockingQueueFrameChannel outputChannel = new 
BlockingQueueFrameChannel(20);
+
+    final ProcessorManager<List<Object>, Long> chainedProcessorManager =
+        chainedProcessors(outputChannel.writable(), new ArrayList<>(values), 
failingValue);
+
+    final ListenableFuture<Long> future = exec.runAllFully(
+        chainedProcessorManager,
+        maxOutstandingProcessors,
+        bouncer,
+        null
+    );
+
+    try {
+      future.get();
+    }
+    catch (Exception ignored) {
+    }
+
+    final HashSet<Long> actualValues = new HashSet<>();
+    try (ReadableFrameChannel readable = outputChannel.readable()) {
+      while (readable.canRead()) {
+        actualValues.add(extractColumnValue(readable.read(), 1));
+      }
+    }
+    Assert.assertEquals(expectedValues, actualValues);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private Long extractColumnValue(Frame frame, int columnNo)
+  {
+    FrameReader frameReader = FrameReader.create(ROW_SIGNATURE);
+    FrameCursor frameCursor = FrameProcessors.makeCursor(frame, frameReader);
+    ColumnSelectorFactory columnSelectorFactory = 
frameCursor.getColumnSelectorFactory();
+    ColumnValueSelector columnValueSelector = 
columnSelectorFactory.makeColumnValueSelector(ROW_SIGNATURE.getColumnName(columnNo));
+    return columnValueSelector.getLong();
+  }
+
+  private ProcessorManager<Long, Long> 
createNextProcessors(WritableFrameChannel frameChannel, List<Long> values)
+  {
+    List<SingleRowWritingFrameProcessor> processors = new ArrayList<>();
+    for (Long value : values) {
+      processors.add(new SingleRowWritingFrameProcessor(frameChannel, 
makeInputRow(ROW_SIGNATURE.getColumnName(1), value)));
+    }
+    return ProcessorManagers.of(processors);
+  }
+
+  private static InputRow makeInputRow(Object... kv)
+  {
+    final Map<String, Object> event = TestHelper.makeMap(true, kv);
+    return new MapBasedInputRow(0L, ImmutableList.copyOf(event.keySet()), 
event);
+  }
+
+  public static class PrintFirstAndReturnRestFrameProcessor extends 
SingleChannelFrameProcessor<List<Long>>
+  {
+    private final List<Long> valuesList;
+    private final Long failureValue;
+
+    public PrintFirstAndReturnRestFrameProcessor(WritableFrameChannel 
outputChannel, List<Long> valuesList, Long failureValue)
+    {
+      super(null, new NonFailingWritableFrameChannel(outputChannel));
+      this.valuesList = valuesList;
+      this.failureValue = failureValue;
+    }
+
+    @Override
+    public List<Long> doSimpleWork() throws IOException
+    {
+      Long firstValue = valuesList.get(0);
+      if (Objects.equals(firstValue, failureValue)) {
+        throw new RuntimeException();
+      }
+      InputRow inputRow = makeInputRow(ROW_SIGNATURE.getColumnName(1), 
firstValue);
+      
Iterables.getOnlyElement(outputChannels()).write(TestFrameProcessorUtils.toFrame(Collections.singletonList(inputRow)));
+      if (valuesList.size() == 1) {
+        return Collections.emptyList();
+      }
+      return valuesList.subList(1, valuesList.size());
+    }
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static ProcessorManager<List<Object>, Long> chainedProcessors(
+      WritableFrameChannel writableFrameChannel,
+      List<Long> values,
+      Long failureValue
+  )
+  {
+    if (values.isEmpty()) {
+      return ProcessorManagers.none();
+    }
+    ChainedProcessorManager<List<Long>, List<Long>, Long> processorManager = 
new ChainedProcessorManager<>(
+        ProcessorManagers.of(() -> new 
PrintFirstAndReturnRestFrameProcessor(writableFrameChannel, values, 
failureValue)),
+        returnedValues -> {
+          List<Long> lists = 
returnedValues.stream().flatMap(List::stream).collect(Collectors.toList());
+          return (ProcessorManager) chainedProcessors(
+              writableFrameChannel,
+              lists,
+              failureValue
+          );
+        }
+    );
+    return (ProcessorManager) processorManager;
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/NonFailingWritableFrameChannel.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/NonFailingWritableFrameChannel.java
new file mode 100644
index 00000000000..07b1dc2af7a
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/NonFailingWritableFrameChannel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+public class NonFailingWritableFrameChannel implements WritableFrameChannel
+{
+  final WritableFrameChannel delegate;
+
+  public NonFailingWritableFrameChannel(WritableFrameChannel delegate)
+  {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void write(FrameWithPartition frameWithPartition) throws IOException
+  {
+    delegate.write(frameWithPartition);
+  }
+
+  @Override
+  public void fail(@Nullable Throwable cause)
+  {
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    delegate.close();
+  }
+
+  @Override
+  public boolean isClosed()
+  {
+    return delegate.isClosed();
+  }
+
+  @Override
+  public ListenableFuture<?> writabilityFuture()
+  {
+    return delegate.writabilityFuture();
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
index b31818964ea..55e5f7bb81c 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
@@ -398,7 +398,7 @@ public class FrameProcessorExecutorTest
     public final TemporaryFolder temporaryFolder = new TemporaryFolder();
     public final int numThreads;
 
-    FrameProcessorExecutor exec;
+    protected FrameProcessorExecutor exec;
 
     public BaseFrameProcessorExecutorTestSuite(int numThreads)
     {
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/manager/SequenceProcessorManagerTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/manager/SequenceProcessorManagerTest.java
index 59aa8d651ee..a1ce465540a 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/manager/SequenceProcessorManagerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/manager/SequenceProcessorManagerTest.java
@@ -140,7 +140,7 @@ public class SequenceProcessorManagerTest
     Assert.assertEquals(0, closed.get());
   }
 
-  private static class NilFrameProcessor implements FrameProcessor<Unit>
+  public static class NilFrameProcessor<T> implements FrameProcessor<T>
   {
     @Override
     public List<ReadableFrameChannel> inputChannels()
@@ -155,7 +155,7 @@ public class SequenceProcessorManagerTest
     }
 
     @Override
-    public ReturnOrAwait<Unit> runIncrementally(IntSet readableInputs)
+    public ReturnOrAwait<T> runIncrementally(IntSet readableInputs)
     {
       throw new UnsupportedOperationException();
     }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/test/SimpleReturningFrameProcessor.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/test/SimpleReturningFrameProcessor.java
new file mode 100644
index 00000000000..d25fba2962f
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/test/SimpleReturningFrameProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.processor.FrameProcessor;
+
+/**
+ * A {@link FrameProcessor} which doesn't write anything to channels.
+ * Instead, it finishes on the first call to {@link 
#runIncrementally(IntSet)}, and returns the set
+ * {@link #returnValue}.
+ */
+public class SimpleReturningFrameProcessor<T> extends 
SingleChannelFrameProcessor<T>
+{
+  private final T returnValue;
+
+  public SimpleReturningFrameProcessor(T returnValue)
+  {
+    super(null, null);
+    this.returnValue = returnValue;
+  }
+
+  @Override
+  public T doSimpleWork()
+  {
+    return returnValue;
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/test/SingleChannelFrameProcessor.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/test/SingleChannelFrameProcessor.java
new file mode 100644
index 00000000000..4d15bdbc16f
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/test/SingleChannelFrameProcessor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Frame processor that writes from and reads from at most a single channel, 
and runs {@link #runIncrementally(IntSet)}
+ * at most once. When {@link #runIncrementally(IntSet)} is called, the class 
calls {@link #doSimpleWork()} and returns
+ * the value.
+ */
+public abstract class SingleChannelFrameProcessor<T> implements 
FrameProcessor<T>
+{
+  @Nullable
+  private final ReadableFrameChannel readableFrameChannel;
+  @Nullable
+  private final WritableFrameChannel writableFrameChannel;
+
+  public SingleChannelFrameProcessor(
+      @Nullable ReadableFrameChannel readableFrameChannel,
+      @Nullable WritableFrameChannel writableFrameChannel
+  )
+  {
+    this.readableFrameChannel = readableFrameChannel;
+    this.writableFrameChannel = writableFrameChannel;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    if (readableFrameChannel == null) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(readableFrameChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    if (writableFrameChannel == null) {
+      return Collections.emptyList();
+    }
+    return Collections.singletonList(writableFrameChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws 
IOException
+  {
+    return ReturnOrAwait.returnObject(doSimpleWork());
+  }
+
+  public abstract T doSimpleWork() throws IOException;
+
+  @Override
+  public void cleanup()
+  {
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/test/SingleRowWritingFrameProcessor.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/test/SingleRowWritingFrameProcessor.java
new file mode 100644
index 00000000000..ef6c80390de
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/test/SingleRowWritingFrameProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import java.io.IOException;
+
+public class SingleRowWritingFrameProcessor extends 
SingleChannelFrameProcessor<Long>
+{
+  private final InputRow inputRow;
+
+  public SingleRowWritingFrameProcessor(WritableFrameChannel 
writableFrameChannel, InputRow inputRow)
+  {
+    super(null, writableFrameChannel);
+    this.inputRow = inputRow;
+  }
+
+  @Override
+  public Long doSimpleWork() throws IOException
+  {
+    final WritableFrameChannel outputChannel = 
Iterables.getOnlyElement(outputChannels());
+    
outputChannel.write(TestFrameProcessorUtils.toFrame(ImmutableList.of(inputRow)));
+    return 1L;
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
new file mode 100644
index 00000000000..0a25993a49a
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+import java.util.List;
+
+public final class TestFrameProcessorUtils
+{
+  private TestFrameProcessorUtils()
+  {
+  }
+
+  public static StorageAdapter toStorageAdapter(List<InputRow> inputRows)
+  {
+    final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema(
+                0,
+                new TimestampSpec("__time", "millis", null),
+                Granularities.NONE,
+                VirtualColumns.EMPTY,
+                DimensionsSpec.builder().useSchemaDiscovery(true).build(),
+                new AggregatorFactory[0],
+                false
+            )
+        )
+        .setMaxRowCount(1000)
+        .build();
+
+    try {
+      for (InputRow inputRow : inputRows) {
+        index.add(inputRow);
+      }
+    }
+    catch (IndexSizeExceededException e) {
+      throw new RuntimeException(e);
+    }
+
+    return new IncrementalIndexStorageAdapter(index);
+  }
+
+  public static Frame toFrame(List<InputRow> inputRows)
+  {
+    final StorageAdapter storageAdapter = toStorageAdapter(inputRows);
+    return 
Iterables.getOnlyElement(FrameSequenceBuilder.fromAdapter(storageAdapter)
+                                                        
.frameType(FrameType.ROW_BASED)
+                                                        .frames()
+                                                        .toList());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to