This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fb63520de91 Add tests for ProcessorManager (#16327)
fb63520de91 is described below
commit fb63520de917f81a597257d22ae9c95457503148
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Apr 30 09:35:26 2024 +0530
Add tests for ProcessorManager (#16327)
* Add tests for ProcessorManager
---
.../msq/querykit/ChainedProcessorManager.java | 2 +-
.../msq/querykit/ChainedProcessorManagerTest.java | 337 +++++++++++++++++++++
.../querykit/NonFailingWritableFrameChannel.java | 66 ++++
.../processor/FrameProcessorExecutorTest.java | 2 +-
.../manager/SequenceProcessorManagerTest.java | 4 +-
.../test/SimpleReturningFrameProcessor.java | 45 +++
.../test/SingleChannelFrameProcessor.java | 84 +++++
.../test/SingleRowWritingFrameProcessor.java | 46 +++
.../processor/test/TestFrameProcessorUtils.java | 84 +++++
9 files changed, 666 insertions(+), 4 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java
index 687f9692cb3..22be0f3e64d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java
@@ -111,7 +111,7 @@ public class ChainedProcessorManager<A, B, R> implements
ProcessorManager<Object
);
}
- private synchronized void checkFirstProcessorComplete()
+ private void checkFirstProcessorComplete()
{
if (first == null && (firstProcessorResult.size() ==
firstProcessorCount.get())) {
restFuture.set(restFactory.apply(firstProcessorResult));
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/ChainedProcessorManagerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/ChainedProcessorManagerTest.java
new file mode 100644
index 00000000000..b8405b5f293
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/ChainedProcessorManagerTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.Bouncer;
+import org.apache.druid.frame.processor.FrameProcessorExecutorTest;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.manager.ProcessorManager;
+import org.apache.druid.frame.processor.manager.ProcessorManagers;
+import org.apache.druid.frame.processor.manager.SequenceProcessorManagerTest;
+import org.apache.druid.frame.processor.test.SimpleReturningFrameProcessor;
+import org.apache.druid.frame.processor.test.SingleChannelFrameProcessor;
+import org.apache.druid.frame.processor.test.SingleRowWritingFrameProcessor;
+import org.apache.druid.frame.processor.test.TestFrameProcessorUtils;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameCursor;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+/**
+ * Unit tests for {@link ChainedProcessorManager}.
+ */
+@RunWith(Parameterized.class)
+public class ChainedProcessorManagerTest extends
FrameProcessorExecutorTest.BaseFrameProcessorExecutorTestSuite
+{
+ private final Bouncer bouncer;
+ private final int maxOutstandingProcessors;
+
+ private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+ .addTimeColumn()
+ .add("col",
ColumnType.LONG)
+ .build();
+
+ public ChainedProcessorManagerTest(int numThreads, int bouncerPoolSize, int
maxOutstandingProcessors)
+ {
+ super(numThreads);
+ this.bouncer = bouncerPoolSize == Integer.MAX_VALUE ? Bouncer.unlimited()
: new Bouncer(bouncerPoolSize);
+ this.maxOutstandingProcessors = maxOutstandingProcessors;
+ }
+
+ @Parameterized.Parameters(name = "numThreads = {0}, bouncerPoolSize = {1},
maxOutstandingProcessors = {2}")
+ public static Collection<Object[]> constructorFeeder()
+ {
+ final List<Object[]> constructors = new ArrayList<>();
+
+ for (int numThreads : new int[]{1, 3, 12}) {
+ for (int bouncerPoolSize : new int[]{1, 3, 12, Integer.MAX_VALUE}) {
+ for (int maxOutstandingProcessors : new int[]{1, 3, 12}) {
+ constructors.add(new Object[]{numThreads, bouncerPoolSize,
maxOutstandingProcessors});
+ }
+ }
+ }
+
+ return constructors;
+ }
+
+ /**
+ * Simple functional test of the chained processor manager.
+ * The {@link ChainedProcessorManager#first} is a {@link
SimpleReturningFrameProcessor} which returns the list of
+ * values. The {@link ChainedProcessorManager#restFactory} is a function
that creates a
+ * {@link SingleRowWritingFrameProcessor} for each value.
+ */
+ @Test
+ public void test_simple_chained_processor() throws ExecutionException,
InterruptedException
+ {
+ final ImmutableList<Long> expectedValues = ImmutableList.of(1L, 2L, 3L);
+ final BlockingQueueFrameChannel outputChannel = new
BlockingQueueFrameChannel(3);
+ final SimpleReturningFrameProcessor<List<Long>> firstProcessor = new
SimpleReturningFrameProcessor<>(expectedValues);
+
+ final ChainedProcessorManager<List<Long>, Long, Long>
chainedProcessorManager = new ChainedProcessorManager<>(
+ ProcessorManagers.of(() -> firstProcessor),
+ (values) -> createNextProcessors(outputChannel.writable(),
values.stream().flatMap(List::stream).collect(Collectors.toList()))
+ );
+
+ exec.runAllFully(
+ chainedProcessorManager,
+ maxOutstandingProcessors,
+ bouncer,
+ null
+ ).get();
+
+ final HashSet<Long> actualValues = new HashSet<>();
+ try (ReadableFrameChannel readable = outputChannel.readable()) {
+ while (readable.canRead()) {
+ actualValues.add(extractColumnValue(readable.read(), 1));
+ }
+ }
+ Assert.assertEquals(new HashSet<>(expectedValues), actualValues);
+ }
+
+ /**
+ * Test with multiple processors in {@link ChainedProcessorManager#first}.
+ */
+ @Test
+ public void test_multiple_processor_manager() throws ExecutionException,
InterruptedException
+ {
+ final ImmutableList<Long> valueSet1 = ImmutableList.of(1L, 2L, 3L);
+ final ImmutableList<Long> valueSet2 = ImmutableList.of(4L, 5L, 6L);
+ final BlockingQueueFrameChannel outputChannel = new
BlockingQueueFrameChannel(20);
+
+ final ChainedProcessorManager<List<Long>, Long, Long>
chainedProcessorManager = new ChainedProcessorManager<>(
+ ProcessorManagers.of(
+ ImmutableList.of(
+ new SimpleReturningFrameProcessor<>(valueSet1),
+ new SimpleReturningFrameProcessor<>(valueSet2)
+ )
+ ),
+ (values) -> createNextProcessors(outputChannel.writable(),
values.stream().flatMap(List::stream).collect(Collectors.toList()))
+ );
+
+ exec.runAllFully(
+ chainedProcessorManager,
+ maxOutstandingProcessors,
+ bouncer,
+ null
+ ).get();
+
+ final Set<Long> expectedValues = new HashSet<>();
+ expectedValues.addAll(valueSet1);
+ expectedValues.addAll(valueSet2);
+ final HashSet<Long> actualValues = new HashSet<>();
+ try (ReadableFrameChannel readable = outputChannel.readable()) {
+ while (readable.canRead()) {
+ actualValues.add(extractColumnValue(readable.read(), 1));
+ }
+ }
+ Assert.assertEquals(expectedValues, actualValues);
+ }
+
+ @Test
+ public void test_failing_processor_manager()
+ {
+ final ImmutableSet<Long> expectedValues = ImmutableSet.of();
+ final BlockingQueueFrameChannel outputChannel = new
BlockingQueueFrameChannel(20);
+
+ final ChainedProcessorManager<List<Long>, Long, Long>
chainedProcessorManager = new ChainedProcessorManager<>(
+ ProcessorManagers.of(
+ ImmutableList.of(
+ new SimpleReturningFrameProcessor<>(ImmutableList.of(4L, 5L,
6L)),
+ new SequenceProcessorManagerTest.NilFrameProcessor<>()
+ )
+ ),
+ (values) -> createNextProcessors(
+ new NonFailingWritableFrameChannel(outputChannel.writable()),
+ values.stream().flatMap(List::stream).collect(Collectors.toList())
+ )
+ );
+
+ final ListenableFuture<Long> future = exec.runAllFully(
+ chainedProcessorManager,
+ maxOutstandingProcessors,
+ bouncer,
+ null
+ );
+
+ Assert.assertThrows(ExecutionException.class, future::get);
+
+ final HashSet<Long> actualValues = new HashSet<>();
+ try (ReadableFrameChannel readable = outputChannel.readable()) {
+ while (readable.canRead()) {
+ actualValues.add(extractColumnValue(readable.read(), 1));
+ }
+ }
+ Assert.assertEquals(expectedValues, actualValues);
+ }
+
+ @Test
+ public void test_chaining_processor_manager()
+ {
+ final Set<Long> values = LongStream.range(1,
10).boxed().collect(Collectors.toSet());
+ runChainedExceptionTest(values, values, null);
+ }
+
+ @Test
+ public void test_chaining_processor_manager_with_exception()
+ {
+ final Set<Long> values = LongStream.range(1,
10).boxed().collect(Collectors.toSet());
+
+ for (long failingValue = 1; failingValue < 10; failingValue++) {
+ final Set<Long> expectedValues = LongStream.range(1,
failingValue).boxed().collect(Collectors.toSet());
+ runChainedExceptionTest(values, expectedValues, failingValue);
+ }
+ }
+
+ public void runChainedExceptionTest(Set<Long> values, Set<Long>
expectedValues, Long failingValue)
+ {
+ final BlockingQueueFrameChannel outputChannel = new
BlockingQueueFrameChannel(20);
+
+ final ProcessorManager<List<Object>, Long> chainedProcessorManager =
+ chainedProcessors(outputChannel.writable(), new ArrayList<>(values),
failingValue);
+
+ final ListenableFuture<Long> future = exec.runAllFully(
+ chainedProcessorManager,
+ maxOutstandingProcessors,
+ bouncer,
+ null
+ );
+
+ try {
+ future.get();
+ }
+ catch (Exception ignored) {
+ }
+
+ final HashSet<Long> actualValues = new HashSet<>();
+ try (ReadableFrameChannel readable = outputChannel.readable()) {
+ while (readable.canRead()) {
+ actualValues.add(extractColumnValue(readable.read(), 1));
+ }
+ }
+ Assert.assertEquals(expectedValues, actualValues);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private Long extractColumnValue(Frame frame, int columnNo)
+ {
+ FrameReader frameReader = FrameReader.create(ROW_SIGNATURE);
+ FrameCursor frameCursor = FrameProcessors.makeCursor(frame, frameReader);
+ ColumnSelectorFactory columnSelectorFactory =
frameCursor.getColumnSelectorFactory();
+ ColumnValueSelector columnValueSelector =
columnSelectorFactory.makeColumnValueSelector(ROW_SIGNATURE.getColumnName(columnNo));
+ return columnValueSelector.getLong();
+ }
+
+ private ProcessorManager<Long, Long>
createNextProcessors(WritableFrameChannel frameChannel, List<Long> values)
+ {
+ List<SingleRowWritingFrameProcessor> processors = new ArrayList<>();
+ for (Long value : values) {
+ processors.add(new SingleRowWritingFrameProcessor(frameChannel,
makeInputRow(ROW_SIGNATURE.getColumnName(1), value)));
+ }
+ return ProcessorManagers.of(processors);
+ }
+
+ private static InputRow makeInputRow(Object... kv)
+ {
+ final Map<String, Object> event = TestHelper.makeMap(true, kv);
+ return new MapBasedInputRow(0L, ImmutableList.copyOf(event.keySet()),
event);
+ }
+
+ public static class PrintFirstAndReturnRestFrameProcessor extends
SingleChannelFrameProcessor<List<Long>>
+ {
+ private final List<Long> valuesList;
+ private final Long failureValue;
+
+ public PrintFirstAndReturnRestFrameProcessor(WritableFrameChannel
outputChannel, List<Long> valuesList, Long failureValue)
+ {
+ super(null, new NonFailingWritableFrameChannel(outputChannel));
+ this.valuesList = valuesList;
+ this.failureValue = failureValue;
+ }
+
+ @Override
+ public List<Long> doSimpleWork() throws IOException
+ {
+ Long firstValue = valuesList.get(0);
+ if (Objects.equals(firstValue, failureValue)) {
+ throw new RuntimeException();
+ }
+ InputRow inputRow = makeInputRow(ROW_SIGNATURE.getColumnName(1),
firstValue);
+
Iterables.getOnlyElement(outputChannels()).write(TestFrameProcessorUtils.toFrame(Collections.singletonList(inputRow)));
+ if (valuesList.size() == 1) {
+ return Collections.emptyList();
+ }
+ return valuesList.subList(1, valuesList.size());
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static ProcessorManager<List<Object>, Long> chainedProcessors(
+ WritableFrameChannel writableFrameChannel,
+ List<Long> values,
+ Long failureValue
+ )
+ {
+ if (values.isEmpty()) {
+ return ProcessorManagers.none();
+ }
+ ChainedProcessorManager<List<Long>, List<Long>, Long> processorManager =
new ChainedProcessorManager<>(
+ ProcessorManagers.of(() -> new
PrintFirstAndReturnRestFrameProcessor(writableFrameChannel, values,
failureValue)),
+ returnedValues -> {
+ List<Long> lists =
returnedValues.stream().flatMap(List::stream).collect(Collectors.toList());
+ return (ProcessorManager) chainedProcessors(
+ writableFrameChannel,
+ lists,
+ failureValue
+ );
+ }
+ );
+ return (ProcessorManager) processorManager;
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/NonFailingWritableFrameChannel.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/NonFailingWritableFrameChannel.java
new file mode 100644
index 00000000000..07b1dc2af7a
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/NonFailingWritableFrameChannel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+public class NonFailingWritableFrameChannel implements WritableFrameChannel
+{
+ final WritableFrameChannel delegate;
+
+ public NonFailingWritableFrameChannel(WritableFrameChannel delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void write(FrameWithPartition frameWithPartition) throws IOException
+ {
+ delegate.write(frameWithPartition);
+ }
+
+ @Override
+ public void fail(@Nullable Throwable cause)
+ {
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ delegate.close();
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return delegate.isClosed();
+ }
+
+ @Override
+ public ListenableFuture<?> writabilityFuture()
+ {
+ return delegate.writabilityFuture();
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
index b31818964ea..55e5f7bb81c 100644
---
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
@@ -398,7 +398,7 @@ public class FrameProcessorExecutorTest
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
public final int numThreads;
- FrameProcessorExecutor exec;
+ protected FrameProcessorExecutor exec;
public BaseFrameProcessorExecutorTestSuite(int numThreads)
{
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/manager/SequenceProcessorManagerTest.java
b/processing/src/test/java/org/apache/druid/frame/processor/manager/SequenceProcessorManagerTest.java
index 59aa8d651ee..a1ce465540a 100644
---
a/processing/src/test/java/org/apache/druid/frame/processor/manager/SequenceProcessorManagerTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/processor/manager/SequenceProcessorManagerTest.java
@@ -140,7 +140,7 @@ public class SequenceProcessorManagerTest
Assert.assertEquals(0, closed.get());
}
- private static class NilFrameProcessor implements FrameProcessor<Unit>
+ public static class NilFrameProcessor<T> implements FrameProcessor<T>
{
@Override
public List<ReadableFrameChannel> inputChannels()
@@ -155,7 +155,7 @@ public class SequenceProcessorManagerTest
}
@Override
- public ReturnOrAwait<Unit> runIncrementally(IntSet readableInputs)
+ public ReturnOrAwait<T> runIncrementally(IntSet readableInputs)
{
throw new UnsupportedOperationException();
}
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/test/SimpleReturningFrameProcessor.java
b/processing/src/test/java/org/apache/druid/frame/processor/test/SimpleReturningFrameProcessor.java
new file mode 100644
index 00000000000..d25fba2962f
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/frame/processor/test/SimpleReturningFrameProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.processor.FrameProcessor;
+
+/**
+ * A {@link FrameProcessor} which doesn't write anything to channels.
+ * Instead, it finishes on the first call to {@link
#runIncrementally(IntSet)}, and returns the set
+ * {@link #returnValue}.
+ */
+public class SimpleReturningFrameProcessor<T> extends
SingleChannelFrameProcessor<T>
+{
+ private final T returnValue;
+
+ public SimpleReturningFrameProcessor(T returnValue)
+ {
+ super(null, null);
+ this.returnValue = returnValue;
+ }
+
+ @Override
+ public T doSimpleWork()
+ {
+ return returnValue;
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/test/SingleChannelFrameProcessor.java
b/processing/src/test/java/org/apache/druid/frame/processor/test/SingleChannelFrameProcessor.java
new file mode 100644
index 00000000000..4d15bdbc16f
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/frame/processor/test/SingleChannelFrameProcessor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Frame processor that writes from and reads from at most a single channel,
and runs {@link #runIncrementally(IntSet)}
+ * at most once. When {@link #runIncrementally(IntSet)} is called, the class
calls {@link #doSimpleWork()} and returns
+ * the value.
+ */
+public abstract class SingleChannelFrameProcessor<T> implements
FrameProcessor<T>
+{
+ @Nullable
+ private final ReadableFrameChannel readableFrameChannel;
+ @Nullable
+ private final WritableFrameChannel writableFrameChannel;
+
+ public SingleChannelFrameProcessor(
+ @Nullable ReadableFrameChannel readableFrameChannel,
+ @Nullable WritableFrameChannel writableFrameChannel
+ )
+ {
+ this.readableFrameChannel = readableFrameChannel;
+ this.writableFrameChannel = writableFrameChannel;
+ }
+
+ @Override
+ public List<ReadableFrameChannel> inputChannels()
+ {
+ if (readableFrameChannel == null) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(readableFrameChannel);
+ }
+
+ @Override
+ public List<WritableFrameChannel> outputChannels()
+ {
+ if (writableFrameChannel == null) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(writableFrameChannel);
+ }
+
+ @Override
+ public ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws
IOException
+ {
+ return ReturnOrAwait.returnObject(doSimpleWork());
+ }
+
+ public abstract T doSimpleWork() throws IOException;
+
+ @Override
+ public void cleanup()
+ {
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/test/SingleRowWritingFrameProcessor.java
b/processing/src/test/java/org/apache/druid/frame/processor/test/SingleRowWritingFrameProcessor.java
new file mode 100644
index 00000000000..ef6c80390de
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/frame/processor/test/SingleRowWritingFrameProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import java.io.IOException;
+
+public class SingleRowWritingFrameProcessor extends
SingleChannelFrameProcessor<Long>
+{
+ private final InputRow inputRow;
+
+ public SingleRowWritingFrameProcessor(WritableFrameChannel
writableFrameChannel, InputRow inputRow)
+ {
+ super(null, writableFrameChannel);
+ this.inputRow = inputRow;
+ }
+
+ @Override
+ public Long doSimpleWork() throws IOException
+ {
+ final WritableFrameChannel outputChannel =
Iterables.getOnlyElement(outputChannels());
+
outputChannel.write(TestFrameProcessorUtils.toFrame(ImmutableList.of(inputRow)));
+ return 1L;
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
new file mode 100644
index 00000000000..0a25993a49a
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+import java.util.List;
+
+public final class TestFrameProcessorUtils
+{
+ private TestFrameProcessorUtils()
+ {
+ }
+
+ public static StorageAdapter toStorageAdapter(List<InputRow> inputRows)
+ {
+ final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
+ .setIndexSchema(
+ new IncrementalIndexSchema(
+ 0,
+ new TimestampSpec("__time", "millis", null),
+ Granularities.NONE,
+ VirtualColumns.EMPTY,
+ DimensionsSpec.builder().useSchemaDiscovery(true).build(),
+ new AggregatorFactory[0],
+ false
+ )
+ )
+ .setMaxRowCount(1000)
+ .build();
+
+ try {
+ for (InputRow inputRow : inputRows) {
+ index.add(inputRow);
+ }
+ }
+ catch (IndexSizeExceededException e) {
+ throw new RuntimeException(e);
+ }
+
+ return new IncrementalIndexStorageAdapter(index);
+ }
+
+ public static Frame toFrame(List<InputRow> inputRows)
+ {
+ final StorageAdapter storageAdapter = toStorageAdapter(inputRows);
+ return
Iterables.getOnlyElement(FrameSequenceBuilder.fromAdapter(storageAdapter)
+
.frameType(FrameType.ROW_BASED)
+ .frames()
+ .toList());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]