This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bb1c3c17494 Add serde for ColumnBasedRowsAndColumns to fix window
queries without group by (#16658)
bb1c3c17494 is described below
commit bb1c3c174944460c22c6dd153579dd18994b1f60
Author: Sree Charan Manamala <[email protected]>
AuthorDate: Tue Sep 17 10:14:40 2024 +0530
Add serde for ColumnBasedRowsAndColumns to fix window queries without group
by (#16658)
Register a Ser-De for RowsAndColumns so that the window operator query
running on leaf operators would be transferred properly on the wire. Would fix
the empty response given by window queries without group by on the native
engine.
---
.../jackson/DruidDefaultSerializersModule.java | 18 +----
.../WindowOperatorQueryQueryRunnerFactory.java | 18 +----
.../query/rowsandcols/AppendableMapOfColumns.java | 10 ---
.../query/rowsandcols/ConcatRowsAndColumns.java | 7 --
.../rowsandcols/CursorFactoryRowsAndColumns.java | 2 +-
.../query/rowsandcols/EmptyRowsAndColumns.java | 8 --
.../rowsandcols/LazilyDecoratedRowsAndColumns.java | 14 +---
.../query/rowsandcols/LimitedRowsAndColumns.java | 9 ---
.../rowsandcols/MapOfColumnsRowsAndColumns.java | 2 +-
.../rowsandcols/RearrangedRowsAndColumns.java | 7 --
.../druid/query/rowsandcols/RowsAndColumns.java | 86 +++++++++++++++++++++-
...lumns.java => AbstractFrameRowsAndColumns.java} | 70 ++++++++++--------
.../concrete/ColumnBasedFrameRowsAndColumns.java | 54 +++-----------
.../FrameRowsAndColumns.java} | 18 ++---
.../concrete/RowBasedFrameRowsAndColumns.java | 44 +----------
.../druid/jackson/DefaultObjectMapperTest.java | 26 +++++++
.../query/rowsandcols/NoAsRowsAndColumns.java | 9 ---
.../ColumnBasedFrameRowsAndColumnsTest.java | 10 ++-
.../semantic/RowsAndColumnsDecoratorTest.java | 37 ++++++++++
.../druid/sql/calcite/run/NativeSqlEngine.java | 2 +-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 1 +
.../druid/sql/calcite/CalciteWindowQueryTest.java | 2 +-
.../apache/druid/sql/calcite/NotYetSupported.java | 1 +
.../window/wikipediaFramedAggregations.sqlTest | 2 +-
24 files changed, 229 insertions(+), 228 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java
b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java
index 30cc388f1d9..cad8fdfd831 100644
---
a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java
+++
b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java
@@ -37,7 +37,6 @@ import
org.apache.druid.query.FrameBasedInlineDataSourceSerializer;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
-import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.joda.time.DateTimeZone;
import java.io.IOException;
@@ -189,20 +188,7 @@ public class DruidDefaultSerializersModule extends
SimpleModule
);
addDeserializer(ResponseContext.class, new ResponseContextDeserializer());
- addSerializer(RowsAndColumns.class, new JsonSerializer<RowsAndColumns>()
- {
- @Override
- public void serialize(
- RowsAndColumns value,
- JsonGenerator gen,
- SerializerProvider serializers
- ) throws IOException
- {
- // It would be really cool if jackson offered an output stream that
would allow us to push bytes
- // through, but it doesn't right now, so we have to build a byte[]
instead. Maybe something to contribute
- // back to Jackson at some point.
- gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer());
- }
- });
+ addSerializer(RowsAndColumns.class, new
RowsAndColumns.RowsAndColumnsSerializer());
+ addDeserializer(RowsAndColumns.class, new
RowsAndColumns.RowsAndColumnsDeserializer());
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java
b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java
index d18f6c252c1..f86f91be18b 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java
@@ -21,7 +21,6 @@ package org.apache.druid.query.operator;
import com.google.common.base.Function;
import org.apache.druid.error.DruidException;
-import org.apache.druid.frame.Frame;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -31,10 +30,8 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
-import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
-import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
+import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.segment.Segment;
-import org.apache.druid.segment.column.RowSignature;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -100,19 +97,8 @@ public class WindowOperatorQueryQueryRunnerFactory
implements QueryRunnerFactory
@Override
public RowsAndColumns apply(@Nullable RowsAndColumns
input)
{
- // This is interim code to force a materialization by
synthesizing the wire transfer
- // that will need to naturally happen as we flesh out
this code more. For now, we
- // materialize the bytes on-heap and then read them
back in as a frame.
if (input instanceof LazilyDecoratedRowsAndColumns) {
- final WireTransferable wire =
WireTransferable.fromRAC(input);
- final byte[] frameBytes = wire.bytesToTransfer();
-
- RowSignature.Builder sigBob = RowSignature.builder();
- for (String column : input.getColumnNames()) {
- sigBob.add(column,
input.findColumn(column).toAccessor().getType());
- }
-
- return new
ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
+ return input.as(FrameRowsAndColumns.class);
}
return input;
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java
index 61f6855cd01..d83f56c7ba5 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java
@@ -79,14 +79,4 @@ public class AppendableMapOfColumns implements
AppendableRowsAndColumns
}
return retVal;
}
-
- @Override
- @SuppressWarnings("unchecked")
- public <T> T as(Class<T> clazz)
- {
- if (AppendableRowsAndColumns.class.equals(clazz)) {
- return (T) this;
- }
- return null;
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java
index c6ced60849d..3f70f82a253 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java
@@ -141,13 +141,6 @@ public class ConcatRowsAndColumns implements RowsAndColumns
}
}
- @Nullable
- @Override
- public <T> T as(Class<T> clazz)
- {
- return null;
- }
-
private class ConcatedidColumn implements Column
{
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java
index 6fa74660f7d..46fda857516 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java
@@ -61,7 +61,7 @@ public class CursorFactoryRowsAndColumns implements
CloseableShapeshifter, RowsA
if (CursorFactory.class == clazz) {
return (T) cursorFactory;
}
- return null;
+ return RowsAndColumns.super.as(clazz);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java
index dd0c7dab1cd..56647e0f568 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java
@@ -21,7 +21,6 @@ package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.rowsandcols.column.Column;
-import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
@@ -44,11 +43,4 @@ public class EmptyRowsAndColumns implements RowsAndColumns
{
return null;
}
-
- @Nullable
- @Override
- public <T> T as(Class<T> clazz)
- {
- return null;
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java
index a05b31dc2cb..bb35f683797 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java
@@ -39,10 +39,10 @@ import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import
org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator;
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
-import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
@@ -150,16 +150,10 @@ public class LazilyDecoratedRowsAndColumns implements
RowsAndColumns
@SuppressWarnings("unused")
@SemanticCreator
- public WireTransferable toWireTransferable()
+ public FrameRowsAndColumns toFrameRowsAndColumns()
{
- return () -> {
- final Pair<byte[], RowSignature> materialized = materialize();
- if (materialized == null) {
- return new byte[]{};
- } else {
- return materialized.lhs;
- }
- };
+ maybeMaterialize();
+ return base.as(FrameRowsAndColumns.class);
}
private void maybeMaterialize()
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java
index abb3d4649b1..8cfadecb4dd 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java
@@ -23,7 +23,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.LimitedColumn;
-import javax.annotation.Nullable;
import java.util.Collection;
public class LimitedRowsAndColumns implements RowsAndColumns
@@ -66,12 +65,4 @@ public class LimitedRowsAndColumns implements RowsAndColumns
return new LimitedColumn(column, start, end);
}
-
- @Nullable
- @Override
- public <T> T as(Class<T> clazz)
- {
- return null;
- }
-
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
index 29f092f6744..d6bc1026a98 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java
@@ -164,7 +164,7 @@ public class MapOfColumnsRowsAndColumns implements
RowsAndColumns
if (AppendableRowsAndColumns.class.equals(clazz)) {
return (T) new AppendableMapOfColumns(this);
}
- return null;
+ return RowsAndColumns.super.as(clazz);
}
public static class Builder
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
index f1793f8fd0e..e64f086edd7 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
@@ -164,11 +164,4 @@ public class RearrangedRowsAndColumns implements
RowsAndColumns
);
}
}
-
- @Nullable
- @Override
- public <T> T as(Class<T> clazz)
- {
- return null;
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java
index 7b6a1f6215d..a34d0e463c0 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java
@@ -19,12 +19,30 @@
package org.apache.druid.query.rowsandcols;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.channel.ByteTracker;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.rowsandcols.column.Column;
+import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
+import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
import java.util.Collection;
/**
@@ -110,6 +128,72 @@ public interface RowsAndColumns
* @return A concrete implementation of the interface, or null if there is
no meaningful optimization to be had
* through a local implementation of the interface.
*/
+ @SuppressWarnings("unchecked")
@Nullable
- <T> T as(Class<T> clazz);
+ default <T> T as(Class<T> clazz)
+ {
+ if (clazz.isInstance(this)) {
+ return (T) this;
+ }
+ return null;
+ }
+
+ /**
+ * Serializer for {@link RowsAndColumns} by converting the instance to
{@link FrameRowsAndColumns}
+ */
+ class RowsAndColumnsSerializer extends StdSerializer<RowsAndColumns>
+ {
+ public RowsAndColumnsSerializer()
+ {
+ super(RowsAndColumns.class);
+ }
+
+ @Override
+ public void serialize(
+ RowsAndColumns rac,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider
+ ) throws IOException
+ {
+ FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class);
+ if (frameRAC == null) {
+ throw DruidException.defensive("Unable to serialize RAC");
+ }
+ JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator,
serializerProvider, frameRAC.getSignature());
+
+ Frame frame = frameRAC.getFrame();
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ frame.writeTo(Channels.newChannel(baos), false, null,
ByteTracker.unboundedTracker());
+
+ jsonGenerator.writeBinary(baos.toByteArray());
+ }
+ }
+
+ /**
+ * Deserializer for {@link RowsAndColumns} returning as an instance of
{@link FrameRowsAndColumns}
+ */
+ class RowsAndColumnsDeserializer extends StdDeserializer<RowsAndColumns>
+ {
+ public RowsAndColumnsDeserializer()
+ {
+ super(RowsAndColumns.class);
+ }
+
+ @Override
+ public FrameRowsAndColumns deserialize(JsonParser jsonParser,
DeserializationContext deserializationContext)
+ throws IOException
+ {
+ RowSignature sig = jsonParser.readValueAs(RowSignature.class);
+ jsonParser.nextValue();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ jsonParser.readBinaryValue(baos);
+ Frame frame = Frame.wrap(baos.toByteArray());
+ if (frame.type() == FrameType.COLUMNAR) {
+ return new ColumnBasedFrameRowsAndColumns(frame, sig);
+ } else {
+ return new RowBasedFrameRowsAndColumns(frame, sig);
+ }
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java
similarity index 58%
copy from
processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
copy to
processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java
index e99a3f7f313..5295326c862 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java
@@ -19,64 +19,52 @@
package org.apache.druid.query.rowsandcols.concrete;
+import com.google.common.base.Objects;
import org.apache.druid.frame.Frame;
-import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.frame.read.columnar.FrameColumnReaders;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.CursorFactory;
-import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.LinkedHashMap;
-public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns,
AutoCloseable, CloseableShapeshifter
+public abstract class AbstractFrameRowsAndColumns implements
FrameRowsAndColumns, AutoCloseable, CloseableShapeshifter
{
- private final Frame frame;
- private final RowSignature signature;
- private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
+ final Frame frame;
+ final RowSignature signature;
+ final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
- public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
+ public AbstractFrameRowsAndColumns(Frame frame, RowSignature signature)
{
- this.frame = FrameType.COLUMNAR.ensureType(frame);
+ this.frame = frame;
this.signature = signature;
}
@Override
- public Collection<String> getColumnNames()
+ public Frame getFrame()
{
- return signature.getColumnNames();
+ return frame;
}
@Override
- public int numRows()
+ public RowSignature getSignature()
{
- return frame.numRows();
+ return signature;
}
- @Nullable
@Override
- public Column findColumn(String name)
+ public Collection<String> getColumnNames()
{
- // Use contains so that we can negative cache.
- if (!colCache.containsKey(name)) {
- final int columnIndex = signature.indexOf(name);
- if (columnIndex < 0) {
- colCache.put(name, null);
- } else {
- final ColumnType columnType = signature
- .getColumnType(columnIndex)
- .orElseThrow(() -> new ISE("just got the id, why is columnType not
there?"));
+ return signature.getColumnNames();
+ }
- colCache.put(name, FrameColumnReaders.create(name, columnIndex,
columnType).readRACColumn(frame));
- }
- }
- return colCache.get(name);
+ @Override
+ public int numRows()
+ {
+ return frame.numRows();
}
@SuppressWarnings("unchecked")
@@ -87,7 +75,7 @@ public class ColumnBasedFrameRowsAndColumns implements
RowsAndColumns, AutoClose
if (CursorFactory.class.equals(clazz)) {
return (T) FrameReader.create(signature).makeCursorFactory(frame);
}
- return null;
+ return FrameRowsAndColumns.super.as(clazz);
}
@Override
@@ -95,4 +83,24 @@ public class ColumnBasedFrameRowsAndColumns implements
RowsAndColumns, AutoClose
{
// nothing to close
}
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(frame, signature);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof AbstractFrameRowsAndColumns)) {
+ return false;
+ }
+ AbstractFrameRowsAndColumns otherFrame = (AbstractFrameRowsAndColumns) o;
+
+ return frame.writableMemory().equals(otherFrame.frame.writableMemory()) &&
signature.equals(otherFrame.signature);
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
index e99a3f7f313..c4a4577dc1a 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java
@@ -19,44 +19,21 @@
package org.apache.druid.query.rowsandcols.concrete;
+import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
-import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.read.columnar.FrameColumnReaders;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
-import org.apache.druid.segment.CloseableShapeshifter;
-import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns,
AutoCloseable, CloseableShapeshifter
+public class ColumnBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns
{
- private final Frame frame;
- private final RowSignature signature;
- private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
-
public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
{
- this.frame = FrameType.COLUMNAR.ensureType(frame);
- this.signature = signature;
- }
-
- @Override
- public Collection<String> getColumnNames()
- {
- return signature.getColumnNames();
- }
-
- @Override
- public int numRows()
- {
- return frame.numRows();
+ super(FrameType.COLUMNAR.ensureType(frame), signature);
}
@Nullable
@@ -71,28 +48,17 @@ public class ColumnBasedFrameRowsAndColumns implements
RowsAndColumns, AutoClose
} else {
final ColumnType columnType = signature
.getColumnType(columnIndex)
- .orElseThrow(() -> new ISE("just got the id, why is columnType not
there?"));
+ .orElseThrow(
+ () -> DruidException.defensive(
+ "just got the id [%s][%s], why is columnType not there?",
+ columnIndex,
+ name
+ )
+ );
colCache.put(name, FrameColumnReaders.create(name, columnIndex,
columnType).readRACColumn(frame));
}
}
return colCache.get(name);
}
-
- @SuppressWarnings("unchecked")
- @Nullable
- @Override
- public <T> T as(Class<T> clazz)
- {
- if (CursorFactory.class.equals(clazz)) {
- return (T) FrameReader.create(signature).makeCursorFactory(frame);
- }
- return null;
- }
-
- @Override
- public void close()
- {
- // nothing to close
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java
similarity index 67%
rename from
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java
rename to
processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java
index a7d55f59929..022a0f91ac1 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java
@@ -17,21 +17,15 @@
* under the License.
*/
-package org.apache.druid.query.rowsandcols.semantic;
+package org.apache.druid.query.rowsandcols.concrete;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.column.RowSignature;
-public interface WireTransferable
+public interface FrameRowsAndColumns extends RowsAndColumns
{
- static WireTransferable fromRAC(RowsAndColumns rac)
- {
- WireTransferable retVal = rac.as(WireTransferable.class);
- if (retVal == null) {
- throw new ISE("Rac[%s] cannot be transferred over the wire",
rac.getClass());
- }
- return retVal;
- }
+ Frame getFrame();
- byte[] bytesToTransfer();
+ RowSignature getSignature();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
index 865a24e5d6d..c702c210775 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
@@ -24,40 +24,17 @@ import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.field.FieldReader;
import org.apache.druid.frame.field.FieldReaders;
-import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
-import org.apache.druid.segment.CloseableShapeshifter;
-import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-public class RowBasedFrameRowsAndColumns implements RowsAndColumns,
AutoCloseable, CloseableShapeshifter
+public class RowBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns
{
- private final Frame frame;
- private final RowSignature signature;
- private final LinkedHashMap<String, Column> colCache = new LinkedHashMap<>();
-
public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature)
{
- this.frame = FrameType.ROW_BASED.ensureType(frame);
- this.signature = signature;
- }
-
- @Override
- public Collection<String> getColumnNames()
- {
- return signature.getColumnNames();
- }
-
- @Override
- public int numRows()
- {
- return frame.numRows();
+ super(FrameType.ROW_BASED.ensureType(frame), signature);
}
@Nullable
@@ -86,21 +63,4 @@ public class RowBasedFrameRowsAndColumns implements
RowsAndColumns, AutoCloseabl
}
return colCache.get(name);
}
-
- @SuppressWarnings("unchecked")
- @Nullable
- @Override
- public <T> T as(Class<T> clazz)
- {
- if (CursorFactory.class.equals(clazz)) {
- return (T) FrameReader.create(signature).makeCursorFactory(frame);
- }
- return null;
- }
-
- @Override
- public void close()
- {
- // nothing to close
- }
}
diff --git
a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java
b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java
index 989d137770e..92c8a2cb298 100644
---
a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java
+++
b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java
@@ -22,12 +22,18 @@ package org.apache.druid.jackson;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
+import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Assert;
@@ -35,6 +41,8 @@ import org.junit.Test;
import java.util.Arrays;
+import static org.junit.Assert.assertEquals;
+
/**
*
*/
@@ -102,4 +110,22 @@ public class DefaultObjectMapperTest
}
Assert.fail("We expect InvalidTypeIdException to be thrown");
}
+
+ @Test
+ public void testColumnBasedFrameRowsAndColumns() throws Exception
+ {
+ DefaultObjectMapper om = new DefaultObjectMapper("test");
+
+ MapOfColumnsRowsAndColumns input = (MapOfColumnsRowsAndColumns.fromMap(
+ ImmutableMap.of(
+ "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9,
10}),
+ "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4,
0, 0})
+ )));
+
+ ColumnBasedFrameRowsAndColumns frc =
ColumnBasedFrameRowsAndColumnsTest.buildFrame(input);
+ byte[] bytes = om.writeValueAsBytes(frc);
+
+ ColumnBasedFrameRowsAndColumns frc2 = (ColumnBasedFrameRowsAndColumns)
om.readValue(bytes, RowsAndColumns.class);
+ assertEquals(frc, frc2);
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java
b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java
index 422c87c8b7c..16cd44e870b 100644
---
a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java
+++
b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java
@@ -21,7 +21,6 @@ package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.rowsandcols.column.Column;
-import javax.annotation.Nullable;
import java.util.Collection;
public class NoAsRowsAndColumns implements RowsAndColumns
@@ -50,12 +49,4 @@ public class NoAsRowsAndColumns implements RowsAndColumns
{
return rac.findColumn(name);
}
-
- @Nullable
- @Override
- public <T> T as(Class<T> clazz)
- {
- // Pretend like this doesn't implement any semantic interfaces
- return null;
- }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
index acfcbe6f83e..f6a10e01146 100644
---
a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
@@ -37,7 +37,15 @@ public class ColumnBasedFrameRowsAndColumnsTest extends
RowsAndColumnsTestBase
public static ColumnBasedFrameRowsAndColumns
buildFrame(MapOfColumnsRowsAndColumns input)
{
- LazilyDecoratedRowsAndColumns rac = new
LazilyDecoratedRowsAndColumns(input, null, null, null,
OffsetLimit.limit(Integer.MAX_VALUE), null, null);
+ LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(
+ input,
+ null,
+ null,
+ null,
+ OffsetLimit.limit(Integer.MAX_VALUE),
+ null,
+ null
+ );
rac.numRows(); // materialize
return (ColumnBasedFrameRowsAndColumns) rac.getBase();
diff --git
a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java
b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java
index f90c2ea1917..41295f48017 100644
---
a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.query.rowsandcols.semantic;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@@ -32,6 +33,9 @@ import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
+import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
import org.apache.druid.segment.ArrayListSegment;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
@@ -214,6 +218,39 @@ public class RowsAndColumnsDecoratorTest extends
SemanticTestBase
}
}
+ @Test
+ public void testDecoratorWithColumnBasedFrameRAC()
+ {
+ RowSignature siggy = RowSignature.builder()
+ .add("colA", ColumnType.LONG)
+ .add("colB", ColumnType.LONG)
+ .build();
+
+ Object[][] vals = new Object[][]{
+ {1L, 4L},
+ {2L, -4L},
+ {3L, 3L},
+ {4L, -3L},
+ {5L, 4L},
+ {6L, 82L},
+ {7L, -90L},
+ {8L, 4L},
+ {9L, 0L},
+ {10L, 0L}
+ };
+
+ MapOfColumnsRowsAndColumns input = MapOfColumnsRowsAndColumns.fromMap(
+ ImmutableMap.of(
+ "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9,
10}),
+ "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4,
0, 0})
+ )
+ );
+
+ ColumnBasedFrameRowsAndColumns frc =
ColumnBasedFrameRowsAndColumnsTest.buildFrame(input);
+
+ validateDecorated(frc, siggy, vals, null, null, OffsetLimit.NONE, null);
+ }
+
private void validateDecorated(
RowsAndColumns base,
RowSignature siggy,
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
index 2477ac38dec..d02d302437b 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
@@ -109,7 +109,6 @@ public class NativeSqlEngine implements SqlEngine
case ALLOW_TOP_LEVEL_UNION_ALL:
case TIME_BOUNDARY_QUERY:
case GROUPBY_IMPLICITLY_SORTS:
- case WINDOW_LEAF_OPERATOR:
return true;
case CAN_INSERT:
case CAN_REPLACE:
@@ -117,6 +116,7 @@ public class NativeSqlEngine implements SqlEngine
case WRITE_EXTERNAL_DATA:
case SCAN_ORDER_BY_NON_TIME:
case SCAN_NEEDS_SIGNATURE:
+ case WINDOW_LEAF_OPERATOR:
return false;
default:
throw
SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(),
feature);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index a8dcc35ea7a..732681de238 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -16086,6 +16086,7 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
.run();
}
+ @NotYetSupported(Modes.UNSUPPORTED_DATASOURCE)
@Test
public void testWindowingOverJoin()
{
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java
index 5850be0bd1c..cd6aa514675 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java
@@ -298,7 +298,7 @@ public class CalciteWindowQueryTest extends
BaseCalciteQueryTest
);
assertEquals(
- "Encountered a multi value column [v0]. Window processing does not
support MVDs. "
+ "Encountered a multi value column. Window processing does not support
MVDs. "
+ "Consider using UNNEST or MV_TO_ARRAY.",
e.getMessage()
);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
index da1431f433d..f0c48ff44f2 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
@@ -89,6 +89,7 @@ public @interface NotYetSupported
RESULT_MISMATCH(AssertionError.class, "(assertResulEquals|AssertionError:
column content mismatch)"),
LONG_CASTING(AssertionError.class, "expected: java.lang.Long"),
UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering
with NULLS (LAST|FIRST)"),
+ UNSUPPORTED_DATASOURCE(DruidException.class, "WindowOperatorQuery must run
on top of a query or inline data source"),
UNION_WITH_COMPLEX_OPERAND(DruidException.class, "Only Table and Values
are supported as inputs for Union"),
UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature
mismatch in Union inputs"),
JOIN_CONDITION_NOT_PUSHED_CONDITION(DruidException.class, "SQL requires a
join with '.*' condition"),
diff --git
a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest
b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest
index 87873d44c48..104cb0d2422 100644
---
a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest
+++
b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest
@@ -2,7 +2,7 @@ type: "operatorValidation"
sql: |
SELECT
- countryIsoCode,
+ countryIsoCode,
CAST (FLOOR(__time TO HOUR) AS BIGINT) t,
SUM(delta) delta,
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST
(FLOOR(__time TO HOUR) AS BIGINT) ROWS BETWEEN 3 PRECEDING AND 2 FOLLOWING)
windowedDelta
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]