This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6b9344cd39 Persist legacy LatestPairs for now (#13378)
6b9344cd39 is described below
commit 6b9344cd39017174013f757c274e0d9c8e6671b7
Author: imply-cheddar <[email protected]>
AuthorDate: Fri Nov 18 01:07:02 2022 +0900
Persist legacy LatestPairs for now (#13378)
We added compression to the latest/first pair storage, but
the code change was forcing new things to be persisted
with the new format, meaning that any segment created with
the new code cannot be read by the old code. Instead, we
need to default to creating the old format and then remove that default in
a future version.
---
.../CompressedBigDecimalColumn.java | 7 +-
.../CompressedBigDecimalColumnPartSupplier.java | 53 ++++----
.../CompressedBigDecimalColumnTest.java | 3 +-
.../read/columnar/ComplexFrameColumnReader.java | 2 +-
...rializablePairLongStringComplexMetricSerde.java | 101 +++++++++++++++-
.../druid/query/metadata/SegmentAnalyzer.java | 14 +--
.../apache/druid/segment/column/ComplexColumn.java | 2 +-
.../column/GenericIndexedBasedComplexColumn.java | 2 +-
.../segment/column/UnknownTypeComplexColumn.java | 2 +-
.../nested/CompressedNestedDataComplexColumn.java | 2 +-
...izablePairLongStringComplexMetricSerdeTest.java | 134 +++++++++++----------
...r.java => SingleObjectColumnValueSelector.java} | 15 +--
12 files changed, 220 insertions(+), 117 deletions(-)
diff --git
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
index b6d7d029be..a366675dc0 100644
---
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
+++
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java
@@ -30,7 +30,6 @@ import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.ReadableOffset;
import javax.annotation.Nullable;
-
import java.io.IOException;
/**
@@ -40,6 +39,7 @@ public class CompressedBigDecimalColumn implements
ComplexColumn
{
public static final Logger LOGGER = new
Logger(CompressedBigDecimalColumn.class);
+ private final int length;
private final ColumnarInts scale;
private final ColumnarMultiInts magnitude;
@@ -49,8 +49,9 @@ public class CompressedBigDecimalColumn implements
ComplexColumn
* @param scale scale of the rows
* @param magnitude LongColumn representing magnitudes
*/
- public CompressedBigDecimalColumn(ColumnarInts scale, ColumnarMultiInts
magnitude)
+ public CompressedBigDecimalColumn(int length, ColumnarInts scale,
ColumnarMultiInts magnitude)
{
+ this.length = length;
this.scale = scale;
this.magnitude = magnitude;
}
@@ -87,7 +88,7 @@ public class CompressedBigDecimalColumn implements
ComplexColumn
@Override
public int getLength()
{
- return scale.size();
+ return length;
}
@Override
diff --git
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java
index 0b58827e8a..c51fbc3384 100644
---
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java
+++
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java
@@ -26,6 +26,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
import
org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSupplier;
+
import java.nio.ByteBuffer;
/**
@@ -33,27 +34,8 @@ import java.nio.ByteBuffer;
*/
public class CompressedBigDecimalColumnPartSupplier implements
Supplier<ComplexColumn>
{
-
public static final int VERSION = 0x1;
- private final CompressedVSizeColumnarIntsSupplier scaleSupplier;
- private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier;
-
- /**
- * Constructor.
- *
- * @param scaleSupplier scale supplier
- * @param magnitudeSupplier supplied of results
- */
- public CompressedBigDecimalColumnPartSupplier(
- CompressedVSizeColumnarIntsSupplier scaleSupplier,
- V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier
- )
- {
- this.scaleSupplier = scaleSupplier;
- this.magnitudeSupplier = magnitudeSupplier;
- }
-
/**
* Compressed.
*
@@ -67,23 +49,50 @@ public class CompressedBigDecimalColumnPartSupplier
implements Supplier<ComplexC
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
+ int positionStart = buffer.position();
CompressedVSizeColumnarIntsSupplier scaleSupplier =
CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
buffer,
- IndexIO.BYTE_ORDER);
+ IndexIO.BYTE_ORDER
+ );
V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier =
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer,
IndexIO.BYTE_ORDER);
- return new CompressedBigDecimalColumnPartSupplier(scaleSupplier,
magnitudeSupplier);
+ return new CompressedBigDecimalColumnPartSupplier(
+ buffer.position() - positionStart,
+ scaleSupplier,
+ magnitudeSupplier
+ );
} else {
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
}
+ private final int byteSize;
+ private final CompressedVSizeColumnarIntsSupplier scaleSupplier;
+ private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier;
+
+ /**
+ * Constructor.
+ *
+ * @param scaleSupplier scale supplier
+ * @param magnitudeSupplier supplied of results
+ */
+ public CompressedBigDecimalColumnPartSupplier(
+ int byteSize,
+ CompressedVSizeColumnarIntsSupplier scaleSupplier,
+ V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier
+ )
+ {
+ this.byteSize = byteSize;
+ this.scaleSupplier = scaleSupplier;
+ this.magnitudeSupplier = magnitudeSupplier;
+ }
+
@Override
public ComplexColumn get()
{
- return new CompressedBigDecimalColumn(scaleSupplier.get(),
magnitudeSupplier.get());
+ return new CompressedBigDecimalColumn(byteSize, scaleSupplier.get(),
magnitudeSupplier.get());
}
}
diff --git
a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java
b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java
index b779c63180..d9a3f6e971 100644
---
a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java
+++
b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java
@@ -35,6 +35,7 @@ public class CompressedBigDecimalColumnTest
ColumnarInts columnarInts = EasyMock.createMock(ColumnarInts.class);
ReadableOffset readableOffset = EasyMock.createMock(ReadableOffset.class);
CompressedBigDecimalColumn compressedBigDecimalColumn = new
CompressedBigDecimalColumn(
+ 12345,
columnarInts,
columnarMultiInts
);
@@ -42,7 +43,7 @@ public class CompressedBigDecimalColumnTest
CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL,
compressedBigDecimalColumn.getTypeName()
);
- Assert.assertEquals(0, compressedBigDecimalColumn.getLength());
+ Assert.assertEquals(12345, compressedBigDecimalColumn.getLength());
Assert.assertEquals(CompressedBigDecimalColumn.class,
compressedBigDecimalColumn.getClazz());
Assert.assertNotNull(compressedBigDecimalColumn.makeColumnValueSelector(readableOffset));
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java
b/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java
index 1f6396badd..f7b662d42b 100644
---
a/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java
@@ -175,7 +175,7 @@ public class ComplexFrameColumnReader implements
FrameColumnReader
@Override
public int getLength()
{
- return frame.numRows();
+ return (int) frame.numBytes();
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
index deb1623701..2cc60843f9 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
@@ -28,6 +29,7 @@ import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
+import
org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
@@ -49,6 +51,20 @@ import java.util.Comparator;
*/
public class SerializablePairLongStringComplexMetricSerde extends
ComplexMetricSerde
{
+ /**
+ * This is a configuration parameter to allow for turning on compression.
It is a hack, it would be significantly
+ * better if this could be delivered via properties. The number one reason
this is a hack is because it reads
+ * the System.getProperty which doesn't actually have runtime.properties
files put into it, so this setting
+ * could be set in runtime.properties and this code wouldn't see it, because
that's not how it is wired up.
+ *
+ * The intent of this parameter is so that Druid 25 can be released using
the legacy serialization format. This
+ * will allow us to get code released that can *read* both the legacy and
the new format. Then, in Druid 26,
+ * we can completely eliminate this boolean and start to only *write* the
new format, in which case this
+ * hack of a configuration property disappears.
+ */
+ private static final boolean COMPRESSION_ENABLED =
+
Boolean.parseBoolean(System.getProperty("druid.columns.pairLongString.compressed",
"false"));
+
public static final int EXPECTED_VERSION = 3;
public static final String TYPE_NAME = "serializablePairLongString";
// Null SerializablePairLongString values are put first
@@ -60,6 +76,17 @@ public class SerializablePairLongStringComplexMetricSerde
extends ComplexMetricS
private static final SerializablePairLongStringSimpleStagedSerde SERDE =
new SerializablePairLongStringSimpleStagedSerde();
+ private final boolean compressionEnabled;
+
+ public SerializablePairLongStringComplexMetricSerde()
+ {
+ this(COMPRESSION_ENABLED);
+ }
+
+ public SerializablePairLongStringComplexMetricSerde(boolean
compressionEnabled)
+ {
+ this.compressionEnabled = compressionEnabled;
+ }
@Override
public String getTypeName()
@@ -92,7 +119,7 @@ public class SerializablePairLongStringComplexMetricSerde
extends ComplexMetricS
byte version = buffer.get(buffer.position());
if (version == 0 || version == 1 || version == 2) {
- GenericIndexed<?> column = GenericIndexed.read(buffer,
getObjectStrategy(), columnBuilder.getFileMapper());
+ GenericIndexed<?> column = GenericIndexed.read(buffer, LEGACY_STRATEGY,
columnBuilder.getFileMapper());
columnBuilder.setComplexColumnSupplier(new
ComplexColumnPartSupplier(getTypeName(), column));
} else {
SerializablePairLongStringComplexColumn.Builder builder =
@@ -141,9 +168,73 @@ public class SerializablePairLongStringComplexMetricSerde
extends ComplexMetricS
@Override
public GenericColumnSerializer<?> getSerializer(SegmentWriteOutMedium
segmentWriteOutMedium, String column)
{
- return new SerializablePairLongStringColumnSerializer(
- segmentWriteOutMedium,
- NativeClearedByteBufferProvider.INSTANCE
- );
+ if (compressionEnabled) {
+ return new SerializablePairLongStringColumnSerializer(
+ segmentWriteOutMedium,
+ NativeClearedByteBufferProvider.INSTANCE
+ );
+ } else {
+ return LargeColumnSupportedComplexColumnSerializer.create(
+ segmentWriteOutMedium,
+ column,
+ LEGACY_STRATEGY
+ );
+ }
}
+
+ private static final ObjectStrategy<SerializablePairLongString>
LEGACY_STRATEGY =
+ new ObjectStrategy<SerializablePairLongString>()
+ {
+ @Override
+ public int compare(@Nullable SerializablePairLongString o1, @Nullable
SerializablePairLongString o2)
+ {
+ return COMPARATOR.compare(o1, o2);
+ }
+
+ @Override
+ public Class<? extends SerializablePairLongString> getClazz()
+ {
+ return SerializablePairLongString.class;
+ }
+
+ @Override
+ public SerializablePairLongString fromByteBuffer(ByteBuffer buffer,
int numBytes)
+ {
+ final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
+
+ long lhs = readOnlyBuffer.getLong();
+ int stringSize = readOnlyBuffer.getInt();
+
+ String lastString = null;
+ if (stringSize > 0) {
+ byte[] stringBytes = new byte[stringSize];
+ readOnlyBuffer.get(stringBytes, 0, stringSize);
+ lastString = StringUtils.fromUtf8(stringBytes);
+ }
+
+ return new SerializablePairLongString(lhs, lastString);
+ }
+
+ @Override
+ public byte[] toBytes(SerializablePairLongString val)
+ {
+ String rhsString = val.rhs;
+ ByteBuffer bbuf;
+
+ if (rhsString != null) {
+ byte[] rhsBytes = StringUtils.toUtf8(rhsString);
+ bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES +
rhsBytes.length);
+ bbuf.putLong(val.lhs);
+ bbuf.putInt(Long.BYTES, rhsBytes.length);
+ bbuf.position(Long.BYTES + Integer.BYTES);
+ bbuf.put(rhsBytes);
+ } else {
+ bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
+ bbuf.putLong(val.lhs);
+ bbuf.putInt(Long.BYTES, 0);
+ }
+
+ return bbuf.array();
+ }
+ };
}
diff --git
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java
index 88cc5dcb8f..041d8566d3 100644
---
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java
+++
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java
@@ -96,7 +96,7 @@ public class SegmentAnalyzer
final StorageAdapter storageAdapter = segment.asStorageAdapter();
// get length and column names from storageAdapter
- final int length = storageAdapter.getNumRows();
+ final int numRows = storageAdapter.getNumRows();
// Use LinkedHashMap to preserve column order.
final Map<String, ColumnAnalysis> columns = new LinkedHashMap<>();
@@ -119,13 +119,13 @@ public class SegmentAnalyzer
final int bytesPerRow =
ColumnHolder.TIME_COLUMN_NAME.equals(columnName) ?
NUM_BYTES_IN_TIMESTAMP : Long.BYTES;
- analysis = analyzeNumericColumn(capabilities, length, bytesPerRow);
+ analysis = analyzeNumericColumn(capabilities, numRows, bytesPerRow);
break;
case FLOAT:
- analysis = analyzeNumericColumn(capabilities, length,
NUM_BYTES_IN_TEXT_FLOAT);
+ analysis = analyzeNumericColumn(capabilities, numRows,
NUM_BYTES_IN_TEXT_FLOAT);
break;
case DOUBLE:
- analysis = analyzeNumericColumn(capabilities, length, Double.BYTES);
+ analysis = analyzeNumericColumn(capabilities, numRows, Double.BYTES);
break;
case STRING:
if (index != null) {
@@ -136,7 +136,7 @@ public class SegmentAnalyzer
break;
case COMPLEX:
final ColumnHolder columnHolder = index != null ?
index.getColumnHolder(columnName) : null;
- analysis = analyzeComplexColumn(capabilities, columnHolder);
+ analysis = analyzeComplexColumn(capabilities, numRows, columnHolder);
break;
default:
log.warn("Unknown column type[%s].", capabilities.asTypeString());
@@ -330,6 +330,7 @@ public class SegmentAnalyzer
private ColumnAnalysis analyzeComplexColumn(
@Nullable final ColumnCapabilities capabilities,
+ final int numCells,
@Nullable final ColumnHolder columnHolder
)
{
@@ -362,8 +363,7 @@ public class SegmentAnalyzer
);
}
- final int length = complexColumn.getLength();
- for (int i = 0; i < length; ++i) {
+ for (int i = 0; i < numCells; ++i) {
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java
b/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java
index 0122f1e535..b0f3a4388c 100644
---
a/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java
@@ -58,7 +58,7 @@ public interface ComplexColumn extends BaseColumn
Object getRowValue(int rowNum);
/**
- * @return serialized size (in bytes) of this column.
+ * @return serialized size (in bytes) of this column. -1 for unknown
*/
int getLength();
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java
b/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java
index f3ec134117..4b1654d888 100644
---
a/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java
@@ -58,7 +58,7 @@ public class GenericIndexedBasedComplexColumn implements
ComplexColumn
@Override
public int getLength()
{
- return index.size();
+ return -1;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java
b/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java
index ce4db70438..3f11b779ea 100644
---
a/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java
@@ -59,7 +59,7 @@ public class UnknownTypeComplexColumn implements ComplexColumn
@Override
public int getLength()
{
- return 0;
+ return -1;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
index e062d1e94c..c4682d2071 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
@@ -281,7 +281,7 @@ public final class
CompressedNestedDataComplexColumn<TStringDictionary extends I
@Override
public int getLength()
{
- return 0;
+ return -1;
}
@Override
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java
b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java
index c8605f4695..f503874a4d 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java
@@ -20,55 +20,50 @@
package org.apache.druid.query.aggregation;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.serde.cell.RandomStringUtils;
import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicReference;
public class SerializablePairLongStringComplexMetricSerdeTest
{
- private static final SerializablePairLongStringComplexMetricSerde
COMPLEX_METRIC_SERDE =
+ static {
+ NullHandling.initializeForTests();
+ }
+
+ private static final SerializablePairLongStringComplexMetricSerde
LEGACY_SERDE =
new SerializablePairLongStringComplexMetricSerde();
+ private static final SerializablePairLongStringComplexMetricSerde
COMPRESSED_SERDE =
+ new SerializablePairLongStringComplexMetricSerde(true);
// want deterministic test input
private final Random random = new Random(0);
private final RandomStringUtils randomStringUtils = new
RandomStringUtils(random);
- private GenericColumnSerializer<SerializablePairLongString> serializer;
-
- @SuppressWarnings("unchecked")
- @Before
- public void setup()
- {
- SegmentWriteOutMedium writeOutMedium = new
OnHeapMemorySegmentWriteOutMedium();
- serializer = (GenericColumnSerializer<SerializablePairLongString>)
COMPLEX_METRIC_SERDE.getSerializer(
- writeOutMedium,
- "not-used"
- );
- }
-
@Test
public void testSingle() throws Exception
{
- assertExpected(ImmutableList.of(new SerializablePairLongString(100L,
"fuu")), 77);
+ assertExpected(ImmutableList.of(new SerializablePairLongString(100L,
"fuu")), 33, 77);
}
@Test
@@ -78,7 +73,7 @@ public class SerializablePairLongStringComplexMetricSerdeTest
assertExpected(ImmutableList.of(new SerializablePairLongString(
100L,
randomStringUtils.randomAlphanumeric(2 * 1024 * 1024)
- )), 2103140);
+ )), 2097182, 2103140);
}
@Test
@@ -95,8 +90,7 @@ public class SerializablePairLongStringComplexMetricSerdeTest
valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long)
i, stringList.get(i % numStrings)));
}
- //actual input bytes in naive encoding is ~10mb
- assertExpected(valueList, 1746026);
+ assertExpected(valueList, 10440010, 1746026);
}
@Test
@@ -109,8 +103,7 @@ public class
SerializablePairLongStringComplexMetricSerdeTest
valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long)
i, stringValue));
}
- //actual input bytes in naive encoding is ~10mb
- assertExpected(valueList, 289645);
+ assertExpected(valueList, 10440010, 289645);
}
@Test
@@ -122,81 +115,109 @@ public class
SerializablePairLongStringComplexMetricSerdeTest
valueList.add(new SerializablePairLongString(random.nextLong(),
randomStringUtils.randomAlphanumeric(1024)));
}
- assertExpected(valueList, 10428975);
+ assertExpected(valueList, 10440010, 10428975);
}
@Test
public void testNullString() throws Exception
{
- assertExpected(ImmutableList.of(new SerializablePairLongString(100L,
null)), 74);
+ assertExpected(ImmutableList.of(new SerializablePairLongString(100L,
null)), 30, 74);
}
@Test
public void testEmpty() throws Exception
{
// minimum size for empty data
- assertExpected(Collections.emptyList(), 57);
+ assertExpected(Collections.emptyList(), 10, 57);
}
@Test
public void testSingleNull() throws Exception
{
- assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 58);
+ assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 18,
58);
}
@Test
public void testMultipleNull() throws Exception
{
- assertExpected(Arrays.asList(null, null, null, null), 59);
- }
-
- private void assertExpected(List<SerializablePairLongString> expected)
throws IOException
- {
- assertExpected(expected, -1);
+ assertExpected(Arrays.asList(null, null, null, null), 42, 59);
}
- private void assertExpected(List<SerializablePairLongString> expected, int
expectedSize) throws IOException
+ private ByteBuffer assertExpected(
+ List<SerializablePairLongString> expected,
+ int expectedLegacySize,
+ int expectedCompressedSize
+ ) throws IOException
{
- List<SerializablePairLongStringValueSelector> valueSelectors =
-
expected.stream().map(SerializablePairLongStringValueSelector::new).collect(Collectors.toList());
- ByteBuffer byteBuffer = serializeAllValuesToByteBuffer(valueSelectors,
serializer, expectedSize);
-
- try (SerializablePairLongStringComplexColumn complexColumn =
createComplexColumn(byteBuffer)) {
- for (int i = 0; i < valueSelectors.size(); i++) {
- Assert.assertEquals(expected.get(i), complexColumn.getRowValue(i));
+ SegmentWriteOutMedium writeOutMedium = new
OnHeapMemorySegmentWriteOutMedium();
+ ByteBuffer legacyBuffer = serializeAllValuesToByteBuffer(
+ expected,
+ LEGACY_SERDE.getSerializer(writeOutMedium, "not-used"),
+ expectedLegacySize
+ ).asReadOnlyBuffer();
+ ByteBuffer compressedBuffer = serializeAllValuesToByteBuffer(
+ expected,
+ COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used"),
+ expectedCompressedSize
+ ).asReadOnlyBuffer();
+
+ try (ComplexColumn legacyCol = createComplexColumn(legacyBuffer);
+ ComplexColumn compressedCol = createComplexColumn(compressedBuffer)
+ ) {
+ for (int i = 0; i < expected.size(); i++) {
+ Assert.assertEquals(expected.get(i), legacyCol.getRowValue(i));
+ Assert.assertEquals(expected.get(i), compressedCol.getRowValue(i));
}
}
+ return compressedBuffer;
}
- private SerializablePairLongStringComplexColumn
createComplexColumn(ByteBuffer byteBuffer)
+ private ComplexColumn createComplexColumn(ByteBuffer byteBuffer)
{
ColumnBuilder builder = new ColumnBuilder();
int serializedSize = byteBuffer.remaining();
- COMPLEX_METRIC_SERDE.deserializeColumn(byteBuffer, builder);
+ LEGACY_SERDE.deserializeColumn(byteBuffer, builder);
builder.setType(ValueType.COMPLEX);
ColumnHolder columnHolder = builder.build();
- SerializablePairLongStringComplexColumn column =
(SerializablePairLongStringComplexColumn) columnHolder.getColumn();
-
- Assert.assertEquals(serializedSize, column.getLength());
- Assert.assertEquals("serializablePairLongString", column.getTypeName());
- Assert.assertEquals(SerializablePairLongString.class, column.getClazz());
+ final ComplexColumn col = (ComplexColumn) columnHolder.getColumn();
+ if (col instanceof SerializablePairLongStringComplexColumn) {
+ Assert.assertEquals(serializedSize, col.getLength());
+ }
+ Assert.assertEquals("serializablePairLongString", col.getTypeName());
+ Assert.assertEquals(SerializablePairLongString.class, col.getClazz());
- return column;
+ return col;
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
private static ByteBuffer serializeAllValuesToByteBuffer(
- Collection<SerializablePairLongStringValueSelector> valueSelectors,
- GenericColumnSerializer<SerializablePairLongString> serializer,
+ List<SerializablePairLongString> values,
+ GenericColumnSerializer serializer,
int expectedSize
) throws IOException
{
serializer.open();
- for (SerializablePairLongStringValueSelector valueSelector :
valueSelectors) {
+ final AtomicReference<SerializablePairLongString> reference = new
AtomicReference<>(null);
+ ColumnValueSelector<SerializablePairLongString> valueSelector =
+ new SingleObjectColumnValueSelector<SerializablePairLongString>(
+ SerializablePairLongString.class
+ )
+ {
+ @Nullable
+ @Override
+ public SerializablePairLongString getObject()
+ {
+ return reference.get();
+ }
+ };
+
+ for (SerializablePairLongString selector : values) {
+ reference.set(selector);
serializer.serialize(valueSelector);
}
@@ -225,13 +246,4 @@ public class
SerializablePairLongStringComplexMetricSerdeTest
return byteBuffer;
}
-
- private static class SerializablePairLongStringValueSelector
- extends SingleValueColumnValueSelector<SerializablePairLongString>
- {
- public SerializablePairLongStringValueSelector(SerializablePairLongString
value)
- {
- super(SerializablePairLongString.class, value);
- }
- }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java
b/processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java
similarity index 83%
rename from
processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java
rename to
processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java
index 8c9d232ca9..5869a6b871 100644
---
a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java
+++
b/processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java
@@ -22,17 +22,13 @@ package org.apache.druid.query.aggregation;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
-import javax.annotation.Nullable;
-
-public class SingleValueColumnValueSelector<T> implements
ColumnValueSelector<T>
+public abstract class SingleObjectColumnValueSelector<T> implements
ColumnValueSelector<T>
{
private final Class<T> valueClass;
- private final T value;
- public SingleValueColumnValueSelector(Class<T> valueClass, T value)
+ public SingleObjectColumnValueSelector(Class<T> valueClass)
{
this.valueClass = valueClass;
- this.value = value;
}
@Override
@@ -64,13 +60,6 @@ public class SingleValueColumnValueSelector<T> implements
ColumnValueSelector<T>
return false;
}
- @Nullable
- @Override
- public T getObject()
- {
- return value;
- }
-
@Override
public Class<? extends T> classOfObject()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]