This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 58a8a23243f Avoid conversion to String in JsonReader, JsonNodeReader.
(#15693)
58a8a23243f is described below
commit 58a8a23243f956e0bf22651767fd70bc333d0621
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Mar 26 08:16:05 2024 -0700
Avoid conversion to String in JsonReader, JsonNodeReader. (#15693)
* Avoid conversion to String in JsonReader, JsonNodeReader.
These readers were running UTF-8 decode on the provided entity to
convert it to a String, then parsing the String as JSON. The patch
changes them to parse the provided entity's input stream directly.
In order to preserve the nice error messages that include parse errors,
the readers now need to open the entity again on the error path, to
re-read the data. To make this possible, the InputEntity#open contract
is tightened to require the ability to re-open entities, and existing
InputEntity implementations are updated to allow re-opening.
This patch also renames JsonLineReaderBenchmark to JsonInputFormatBenchmark,
updates it to benchmark all three JSON readers, and adds a case that reads
fields out of the parsed row (not just creates it).
* Fixes for static analysis.
* Implement intermediateRowAsString in JsonReader.
* Enhanced JsonInputFormatBenchmark.
Renames JsonLineReaderBenchmark to JsonInputFormatBenchmark, and enhances
it to
test various readers (JsonReader, JsonLineReader, JsonNodeReader) as well as
to test with/without field discovery.
---
benchmarks/pom.xml | 5 +
.../druid/benchmark/JsonInputFormatBenchmark.java | 319 +++++++++++++++++++++
.../druid/benchmark/JsonLineReaderBenchmark.java | 174 -----------
.../druid/indexing/input/InputRowSchemas.java | 4 +-
.../seekablestream/SeekableStreamSamplerSpec.java | 3 +-
.../seekablestream/SettableByteEntity.java | 26 +-
.../indexing/seekablestream/StreamChunkParser.java | 2 +-
.../org/apache/druid/data/input/InputEntity.java | 4 +-
.../apache/druid/data/input/impl/ByteEntity.java | 13 +-
.../druid/data/input/impl/JsonNodeReader.java | 16 +-
.../apache/druid/data/input/impl/JsonReader.java | 60 ++--
11 files changed, 393 insertions(+), 233 deletions(-)
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index c9a4768d4bd..56bd3700a58 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -65,6 +65,11 @@
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-indexing-service</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/JsonInputFormatBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/JsonInputFormatBenchmark.java
new file mode 100644
index 00000000000..df8d2aa126b
--- /dev/null
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/JsonInputFormatBenchmark.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.JsonLineReader;
+import org.apache.druid.data.input.impl.JsonNodeReader;
+import org.apache.druid.data.input.impl.JsonReader;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.input.InputRowSchemas;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.RowAdapter;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+/**
+ * Tests {@link JsonInputFormat} delegates, one per {@link ReaderType}.
+ *
+ * Output is in nanoseconds per parse (or parse and read) of {@link
#DATA_STRING}.
+ */
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@OperationsPerInvocation(JsonInputFormatBenchmark.NUM_EVENTS)
+@Warmup(iterations = 5)
+@Measurement(iterations = 10)
+@Fork(value = 1)
+public class JsonInputFormatBenchmark
+{
+ enum ReaderType
+ {
+ READER(JsonReader.class) {
+ @Override
+ public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
+ {
+ return new JsonInputFormat(flattenSpec, null, null, false,
false).withLineSplittable(false);
+ }
+ },
+ LINE_READER(JsonLineReader.class) {
+ @Override
+ public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
+ {
+ return new JsonInputFormat(flattenSpec, null, null, null,
null).withLineSplittable(true);
+ }
+ },
+ NODE_READER(JsonNodeReader.class) {
+ @Override
+ public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
+ {
+ return new JsonInputFormat(flattenSpec, null, null, false,
true).withLineSplittable(false);
+ }
+ };
+
+ private final Class<? extends InputEntityReader> clazz;
+
+ ReaderType(Class<? extends InputEntityReader> clazz)
+ {
+ this.clazz = clazz;
+ }
+
+ public abstract JsonInputFormat createFormat(JSONPathSpec flattenSpec);
+ }
+
+ public static final int NUM_EVENTS = 1000;
+
+ private static final String DATA_STRING =
+ "{" +
+ "\"stack\":\"mainstack\"," +
+ "\"metadata\":" +
+ "{" +
+ "\"application\":\"applicationname\"," +
+ "\"detail\":\"tm\"," +
+
"\"id\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\","
+
+
"\"idtwo\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\","
+
+ "\"sequence\":\"v008\"," +
+ "\"stack\":\"mainstack\"," +
+ "\"taskId\":\"12345678-1234-1234-1234-1234567890ab\"," +
+ "\"taskIdTwo\":\"12345678-1234-1234-1234-1234567890ab\"" +
+ "}," +
+ "\"_cluster_\":\"kafka\"," +
+ "\"_id_\":\"12345678-1234-1234-1234-1234567890ab\"," +
+ "\"_offset_\":12111398526," +
+ "\"type\":\"CUMULATIVE_DOUBLE\"," +
+ "\"version\":\"v1\"," +
+ "\"timestamp\":1670425782281," +
+ "\"point\":{\"seconds\":1670425782,\"nanos\":217000000,\"value\":0}," +
+ "\"_kafka_timestamp_\":1670425782304," +
+ "\"_partition_\":60," +
+ "\"ec2_instance_id\":\"i-1234567890\"," +
+ "\"name\":\"packets_received\"," +
+ "\"_topic_\":\"test_topic\"}";
+
+ private static final List<String> FIELDS_TO_READ =
+ ImmutableList.of(
+ "stack",
+ "_cluster_",
+ "_id_",
+ "_offset_",
+ "type",
+ "version",
+ "_kafka_timestamp_",
+ "_partition_",
+ "ec2_instance_id",
+ "name",
+ "_topic",
+ "root_type",
+ "path_app",
+ "jq_app"
+ );
+
+ ReaderType readerType;
+ InputRowSchema inputRowSchema;
+ InputEntityReader reader;
+ JsonInputFormat format;
+ List<Function<InputRow, Object>> fieldFunctions;
+ ByteEntity data;
+
+ @Param({"reader", "node_reader", "line_reader"})
+ private String readerTypeString;
+
+ /**
+ * If false: only read {@link #FIELDS_TO_READ}. If true: discover and read
all fields.
+ */
+ @Param({"false", "true"})
+ private boolean discovery;
+
+ @Setup
+ public void setUpTrial() throws Exception
+ {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final byte[] dataUtf8 = StringUtils.toUtf8(DATA_STRING);
+
+ for (int i = 0; i < NUM_EVENTS; i++) {
+ baos.write(dataUtf8);
+ baos.write(new byte[]{'\n'});
+ }
+
+ final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso",
null);
+ final DimensionsSpec dimensionsSpec;
+
+ if (discovery) {
+ // Discovered schema, excluding uninteresting fields that are not in
FIELDS_TO_READ.
+ final Set<String> exclusions = Sets.difference(
+ TestHelper.makeJsonMapper()
+ .readValue(DATA_STRING,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
+ .keySet(),
+ ImmutableSet.copyOf(FIELDS_TO_READ)
+ );
+
+ dimensionsSpec = DimensionsSpec.builder()
+ .useSchemaDiscovery(true)
+
.setDimensionExclusions(ImmutableList.copyOf(exclusions))
+ .build();
+ } else {
+ // Fully defined schema.
+ dimensionsSpec = DimensionsSpec.builder()
+
.setDimensions(DimensionsSpec.getDefaultSchemas(FIELDS_TO_READ))
+ .build();
+ }
+
+ data = new ByteEntity(baos.toByteArray());
+ readerType = ReaderType.valueOf(StringUtils.toUpperCase(readerTypeString));
+ inputRowSchema = new InputRowSchema(
+ timestampSpec,
+ dimensionsSpec,
+ InputRowSchemas.createColumnsFilter(
+ timestampSpec,
+ dimensionsSpec,
+ TransformSpec.NONE,
+ new AggregatorFactory[0]
+ )
+ );
+ format = readerType.createFormat(
+ new JSONPathSpec(
+ true,
+ ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_type",
"type"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_app",
"$.metadata.application"),
+ new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_app",
".metadata.application")
+ )
+ )
+ );
+
+ final RowAdapter<InputRow> rowAdapter =
format.createRowAdapter(inputRowSchema);
+ fieldFunctions = new ArrayList<>(FIELDS_TO_READ.size());
+
+ for (final String field : FIELDS_TO_READ) {
+ fieldFunctions.add(rowAdapter.columnFunction(field));
+ }
+
+ reader = format.createReader(inputRowSchema, data, null);
+
+ if (reader.getClass() != readerType.clazz) {
+ throw new ISE(
+ "Expected class[%s] for readerType[%s], got[%s]",
+ readerType.clazz,
+ readerTypeString,
+ reader.getClass()
+ );
+ }
+ }
+
+ /**
+ * Benchmark parsing, but not reading fields.
+ */
+ @Benchmark
+ public void parse(final Blackhole blackhole) throws IOException
+ {
+ data.getBuffer().rewind();
+
+ int counted = 0;
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ while (iterator.hasNext()) {
+ final InputRow row = iterator.next();
+ if (row != null) {
+ counted += 1;
+ blackhole.consume(row);
+ }
+ }
+ }
+
+ if (counted != NUM_EVENTS) {
+ throw new RuntimeException("invalid number of loops, counted = " +
counted);
+ }
+ }
+
+ /**
+ * Benchmark parsing and reading {@link #FIELDS_TO_READ}. More realistic
than {@link #parse(Blackhole)}.
+ */
+ @Benchmark
+ public void parseAndRead(final Blackhole blackhole) throws IOException
+ {
+ data.getBuffer().rewind();
+
+ int counted = 0;
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ while (iterator.hasNext()) {
+ final InputRow row = iterator.next();
+ if (row != null) {
+ for (Function<InputRow, Object> fieldFunction : fieldFunctions) {
+ blackhole.consume(fieldFunction.apply(row));
+ }
+
+ counted += 1;
+ }
+ }
+ }
+
+ if (counted != NUM_EVENTS) {
+ throw new RuntimeException("invalid number of loops, counted = " +
counted);
+ }
+ }
+
+ public static void main(String[] args) throws RunnerException
+ {
+ Options opt = new OptionsBuilder()
+ .include(JsonInputFormatBenchmark.class.getSimpleName())
+ .build();
+
+ new Runner(opt).run();
+ }
+}
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/JsonLineReaderBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/JsonLineReaderBenchmark.java
deleted file mode 100644
index 44b3054ccbf..00000000000
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/JsonLineReaderBenchmark.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.benchmark;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.ColumnsFilter;
-import org.apache.druid.data.input.InputEntityReader;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
-import org.apache.druid.data.input.impl.ByteEntity;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.JsonInputFormat;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
-import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
-import org.apache.druid.java.util.common.parsers.JSONPathSpec;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Warmup;
-import org.openjdk.jmh.infra.Blackhole;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-@State(Scope.Benchmark)
-@BenchmarkMode(Mode.AverageTime)
-@Warmup(iterations = 10)
-@Measurement(iterations = 25)
-@Fork(value = 1)
-public class JsonLineReaderBenchmark
-{
- private static final int NUM_EVENTS = 1000;
-
- InputEntityReader reader;
- JsonInputFormat format;
- byte[] data;
-
- @Setup(Level.Invocation)
- public void prepareReader()
- {
- ByteEntity source = new ByteEntity(data);
- reader = format.createReader(
- new InputRowSchema(
- new TimestampSpec("timestamp", "iso", null),
- new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar",
"foo"))),
- ColumnsFilter.all()
- ),
- source,
- null
- );
- }
-
- @Setup
- public void prepareData() throws Exception
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- String dataString = "{" +
- "\"stack\":\"mainstack\"," +
- "\"metadata\":" +
- "{" +
- "\"application\":\"applicationname\"," +
- "\"detail\":\"tm\"," +
-
"\"id\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\","
+
-
"\"idtwo\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\","
+
- "\"sequence\":\"v008\"," +
- "\"stack\":\"mainstack\"," +
- "\"taskId\":\"12345678-1234-1234-1234-1234567890ab\"," +
- "\"taskIdTwo\":\"12345678-1234-1234-1234-1234567890ab\"" +
- "}," +
- "\"_cluster_\":\"kafka\"," +
- "\"_id_\":\"12345678-1234-1234-1234-1234567890ab\"," +
- "\"_offset_\":12111398526," +
- "\"type\":\"CUMULATIVE_DOUBLE\"," +
- "\"version\":\"v1\"," +
- "\"timestamp\":1670425782281," +
-
"\"point\":{\"seconds\":1670425782,\"nanos\":217000000,\"value\":0}," +
- "\"_kafka_timestamp_\":1670425782304," +
- "\"_partition_\":60," +
- "\"ec2_instance_id\":\"i-1234567890\"," +
- "\"name\":\"packets_received\"," +
- "\"_topic_\":\"test_topic\"}";
- for (int i = 0; i < NUM_EVENTS; i++) {
- baos.write(StringUtils.toUtf8(dataString));
- baos.write(new byte[]{'\n'});
- }
-
- data = baos.toByteArray();
- }
-
- @Setup
- public void prepareFormat()
- {
- format = new JsonInputFormat(
- new JSONPathSpec(
- true,
- ImmutableList.of(
- new JSONPathFieldSpec(JSONPathFieldType.ROOT,
"root_baz", "baz"),
- new JSONPathFieldSpec(JSONPathFieldType.ROOT,
"root_baz2", "baz2"),
- new JSONPathFieldSpec(JSONPathFieldType.PATH,
"path_omg", "$.o.mg"),
- new JSONPathFieldSpec(JSONPathFieldType.PATH,
"path_omg2", "$.o.mg2"),
- new JSONPathFieldSpec(JSONPathFieldType.JQ,
"jq_omg", ".o.mg"),
- new JSONPathFieldSpec(JSONPathFieldType.JQ,
"jq_omg2", ".o.mg2")
- )
- ),
- null,
- null,
- null,
- null
- );
- }
-
- @Benchmark
- @BenchmarkMode(Mode.AverageTime)
- @OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void baseline(final Blackhole blackhole) throws IOException
- {
- int counted = 0;
- try (CloseableIterator<InputRow> iterator = reader.read()) {
- while (iterator.hasNext()) {
- final InputRow row = iterator.next();
- if (row != null) {
- counted += 1;
- }
- blackhole.consume(row);
- }
- }
-
- if (counted != NUM_EVENTS) {
- throw new RuntimeException("invalid number of loops, counted = " +
counted);
- }
- }
-
- public static void main(String[] args) throws RunnerException
- {
- Options opt = new OptionsBuilder()
- .include(JsonLineReaderBenchmark.class.getSimpleName())
- .build();
-
- new Runner(opt).run();
- }
-}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
index c895eb14b71..3a04b71e2f1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.input;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -69,8 +68,7 @@ public class InputRowSchemas
*
* @see InputRowSchema#getColumnsFilter()
*/
- @VisibleForTesting
- static ColumnsFilter createColumnsFilter(
+ public static ColumnsFilter createColumnsFilter(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final TransformSpec transformSpec,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index af850dd1050..4705f333722 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -236,7 +236,8 @@ public abstract class
SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
@Override
public InputRowListPlusRawValues next()
{
- final ByteBuffer bb = ((ByteEntity)
entityIterator.next()).getBuffer();
+ // We need to modify the position of the buffer, so duplicate it.
+ final ByteBuffer bb = ((ByteEntity)
entityIterator.next()).getBuffer().duplicate();
final Map<String, Object> rawColumns;
try {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
index 6fe347691fe..bac59d4b3f0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
@@ -37,27 +37,17 @@ import java.nio.ByteBuffer;
* processing where binary records are arriving as a list but {@link
org.apache.druid.data.input.InputEntityReader}, that
* parses the data, expects an {@link InputStream}. This class mimics a
continuous InputStream while behind the scenes,
* binary records are being put one after the other that the InputStream
consumes bytes from. One record is fully
- * consumed and only then the next record is set. This class doesn't allow
reading the same data twice.
+ * consumed and only then the next record is set.
* This class solely exists to overcome the limitations imposed by interfaces
for reading and parsing data.
- *
*/
@NotThreadSafe
public class SettableByteEntity<T extends ByteEntity> implements InputEntity
{
- private final SettableByteBufferInputStream inputStream;
- private boolean opened = false;
private T entity;
- public SettableByteEntity()
- {
- this.inputStream = new SettableByteBufferInputStream();
- }
-
public void setEntity(T entity)
{
- inputStream.setBuffer(entity.getBuffer());
this.entity = entity;
- opened = false;
}
@Nullable
@@ -72,19 +62,13 @@ public class SettableByteEntity<T extends ByteEntity>
implements InputEntity
return entity;
}
- /**
- * This method can be called multiple times only for different data. So you
can open a new input stream
- * only after a new buffer is in use.
- */
@Override
public InputStream open()
{
- if (opened) {
- throw new IllegalArgumentException("Can't open the input stream on
SettableByteEntity more than once");
- }
-
- opened = true;
- return inputStream;
+ // Duplicate the entity buffer, because the stream will update its
position.
+ final SettableByteBufferInputStream stream = new
SettableByteBufferInputStream();
+ stream.setBuffer(entity.getBuffer().duplicate());
+ return stream;
}
public static final class SettableByteBufferInputStream extends InputStream
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index 2ee14917e33..3ac952c16c2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -133,7 +133,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
{
final FluentIterable<InputRow> iterable = FluentIterable
.from(valueBytes)
- .transform(ByteEntity::getBuffer)
+ .transform(entity -> entity.getBuffer().duplicate() /* Parsing may
need to modify buffer position */)
.transform(this::incrementProcessedBytes)
.transformAndConcat(parser::parseBatch);
diff --git
a/processing/src/main/java/org/apache/druid/data/input/InputEntity.java
b/processing/src/main/java/org/apache/druid/data/input/InputEntity.java
index 6765ae82717..d6ea7211573 100644
--- a/processing/src/main/java/org/apache/druid/data/input/InputEntity.java
+++ b/processing/src/main/java/org/apache/druid/data/input/InputEntity.java
@@ -65,9 +65,7 @@ public interface InputEntity
/**
* Opens an {@link InputStream} on the input entity directly.
* This is the basic way to read the given entity.
- *
- * The behavior of this method is only defined fort the first call to open().
- * The behavior of subsequent calls is undefined and may vary between
implementations.
+ * This method may be called multiple times to re-read the data from the
entity.
*
* @see #fetch
*/
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java
b/processing/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java
index 13376912b9a..db4f3396ae4 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java
@@ -31,9 +31,13 @@ public class ByteEntity implements InputEntity
{
private final ByteBuffer buffer;
+ /**
+ * Create a new entity. The buffer is not duplicated, so it is important to
ensure that its position and limit
+ * are not modified after this entity is created.
+ */
public ByteEntity(ByteBuffer buffer)
{
- this.buffer = buffer.duplicate();
+ this.buffer = buffer;
}
public ByteEntity(byte[] bytes)
@@ -41,6 +45,11 @@ public class ByteEntity implements InputEntity
this(ByteBuffer.wrap(bytes));
}
+ /**
+ * Return the buffer backing this entity. Calling code must not modify the
mark, position, or limit of this buffer.
+ * If you need to modify them, call {@link ByteBuffer#duplicate()} or {@link
ByteBuffer#asReadOnlyBuffer()} and
+ * modify the copy.
+ */
public ByteBuffer getBuffer()
{
return buffer;
@@ -56,6 +65,6 @@ public class ByteEntity implements InputEntity
@Override
public InputStream open()
{
- return new ByteBufferInputStream(buffer);
+ return new ByteBufferInputStream(buffer.duplicate());
}
}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
index b5a61f69292..d45a34e8b81 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
@@ -33,7 +33,6 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.java.util.common.CloseableIterators;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -43,6 +42,7 @@ import
org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -94,10 +94,8 @@ public class JsonNodeReader extends
IntermediateRowParsingReader<JsonNode>
@Override
protected CloseableIterator<JsonNode> intermediateRowIterator() throws
IOException
{
- final String sourceString = IOUtils.toString(source.open(),
StringUtils.UTF8_STRING);
final List<JsonNode> jsonNodes = new ArrayList<>();
- try {
- JsonParser parser = jsonFactory.createParser(sourceString);
+ try (final JsonParser parser = jsonFactory.createParser(source.open())) {
final MappingIterator<JsonNode> delegate = mapper.readValues(parser,
JsonNode.class);
while (delegate.hasNext()) {
jsonNodes.add(delegate.next());
@@ -107,9 +105,10 @@ public class JsonNodeReader extends
IntermediateRowParsingReader<JsonNode>
//convert Jackson's JsonParseException into druid's exception for
further processing
//JsonParseException will be thrown from MappingIterator#hasNext or
MappingIterator#next when input json text is ill-formed
if (e.getCause() instanceof JsonParseException) {
+ final String rowAsString = IOUtils.toString(source.open(),
StandardCharsets.UTF_8);
jsonNodes.add(
new ParseExceptionMarkerJsonNode(
- new ParseException(sourceString, e, "Unable to parse row
[%s]", sourceString)
+ new ParseException(rowAsString, e, "Unable to parse row [%s]",
rowAsString)
)
);
} else {
@@ -117,13 +116,14 @@ public class JsonNodeReader extends
IntermediateRowParsingReader<JsonNode>
}
}
- if (CollectionUtils.isNullOrEmpty(jsonNodes)) {
+ if (jsonNodes.isEmpty()) {
+ final String rowAsString = IOUtils.toString(source.open(),
StandardCharsets.UTF_8);
jsonNodes.add(
new ParseExceptionMarkerJsonNode(
new ParseException(
- sourceString,
+ rowAsString,
"Unable to parse [%s] as the intermediateRow resulted in
empty input row",
- sourceString
+ rowAsString
)
)
);
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
index 69f518aecad..3a0fcf45ab8 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
@@ -33,16 +33,17 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.java.util.common.CloseableIterators;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.CollectionUtils;
+import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -59,7 +60,7 @@ import java.util.Map;
*
* For more information, see: https://github.com/apache/druid/pull/10383
*/
-public class JsonReader extends IntermediateRowParsingReader<String>
+public class JsonReader extends IntermediateRowParsingReader<InputEntity>
{
private final ObjectFlattener<JsonNode> flattener;
private final ObjectMapper mapper;
@@ -89,11 +90,9 @@ public class JsonReader extends
IntermediateRowParsingReader<String>
}
@Override
- protected CloseableIterator<String> intermediateRowIterator() throws
IOException
+ protected CloseableIterator<InputEntity> intermediateRowIterator()
{
- return CloseableIterators.withEmptyBaggage(
- Iterators.singletonIterator(IOUtils.toString(source.open(),
StringUtils.UTF8_STRING))
- );
+ return
CloseableIterators.withEmptyBaggage(Iterators.singletonIterator(source));
}
@Override
@@ -103,39 +102,59 @@ public class JsonReader extends
IntermediateRowParsingReader<String>
}
@Override
- protected List<InputRow> parseInputRows(String intermediateRow) throws
IOException, ParseException
+ protected String intermediateRowAsString(@Nullable InputEntity entity)
{
- final List<InputRow> inputRows;
- try (JsonParser parser = jsonFactory.createParser(intermediateRow)) {
+ if (entity == null) {
+ return "null";
+ } else {
+ try {
+ return IOUtils.toString(entity.open(), StandardCharsets.UTF_8);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ protected List<InputRow> parseInputRows(InputEntity entity) throws
IOException, ParseException
+ {
+ final List<InputRow> inputRows = new ArrayList<>();
+ try (JsonParser parser = jsonFactory.createParser(entity.open())) {
final MappingIterator<JsonNode> delegate = mapper.readValues(parser,
JsonNode.class);
- inputRows = FluentIterable.from(() -> delegate)
- .transform(jsonNode ->
MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
- .toList();
+ while (delegate.hasNext()) {
+ final JsonNode row = delegate.next();
+ inputRows.add(MapInputRowParser.parse(inputRowSchema,
flattener.flatten(row)));
+ }
}
catch (RuntimeException e) {
//convert Jackson's JsonParseException into druid's exception for
further processing
//JsonParseException will be thrown from MappingIterator#hasNext or
MappingIterator#next when input json text is ill-formed
if (e.getCause() instanceof JsonParseException) {
- throw new ParseException(intermediateRow, e, "Unable to parse row
[%s]", intermediateRow);
+ final String rowAsString = IOUtils.toString(entity.open(),
StandardCharsets.UTF_8);
+ throw new ParseException(rowAsString, e, "Unable to parse row [%s]",
rowAsString);
}
//throw unknown exception
throw e;
}
- if (CollectionUtils.isNullOrEmpty(inputRows)) {
+
+ if (inputRows.isEmpty()) {
+ final String rowAsString = IOUtils.toString(entity.open(),
StandardCharsets.UTF_8);
throw new ParseException(
- intermediateRow,
+ rowAsString,
"Unable to parse [%s] as the intermediateRow resulted in empty input
row",
- intermediateRow
+ rowAsString
);
}
+
return inputRows;
}
@Override
- protected List<Map<String, Object>> toMap(String intermediateRow) throws
IOException
+ protected List<Map<String, Object>> toMap(InputEntity entity) throws
IOException
{
- try (JsonParser parser = jsonFactory.createParser(intermediateRow)) {
+ try (JsonParser parser = jsonFactory.createParser(entity.open())) {
final MappingIterator<Map> delegate = mapper.readValues(parser,
Map.class);
return FluentIterable.from(() -> delegate)
.transform(map -> (Map<String, Object>) map)
@@ -145,7 +164,8 @@ public class JsonReader extends
IntermediateRowParsingReader<String>
//convert Jackson's JsonParseException into druid's exception for
further processing
//JsonParseException will be thrown from MappingIterator#hasNext or
MappingIterator#next when input json text is ill-formed
if (e.getCause() instanceof JsonParseException) {
- throw new ParseException(intermediateRow, e, "Unable to parse row
[%s]", intermediateRow);
+ final String rowAsString = IOUtils.toString(entity.open(),
StandardCharsets.UTF_8);
+ throw new ParseException(rowAsString, e, "Unable to parse row [%s]",
rowAsString);
}
//throw unknown exception
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]