This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 58a8a23243f Avoid conversion to String in JsonReader, JsonNodeReader. 
(#15693)
58a8a23243f is described below

commit 58a8a23243f956e0bf22651767fd70bc333d0621
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Mar 26 08:16:05 2024 -0700

    Avoid conversion to String in JsonReader, JsonNodeReader. (#15693)
    
    * Avoid conversion to String in JsonReader, JsonNodeReader.
    
    These readers were running UTF-8 decode on the provided entity to
    convert it to a String, then parsing the String as JSON. The patch
    changes them to parse the provided entity's input stream directly.
    
    In order to preserve the nice error messages that include parse errors,
    the readers now need to open the entity again on the error path, to
    re-read the data. To make this possible, the InputEntity#open contract
    is tightened to require the ability to re-open entities, and existing
    InputEntity implementations are updated to allow re-opening.
    
    This patch also renames JsonLineReaderBenchmark to JsonInputFormatBenchmark,
    updates it to benchmark all three JSON readers, and adds a case that reads
    fields out of the parsed row (not just creates it).
    
    * Fixes for static analysis.
    
    * Implement intermediateRowAsString in JsonReader.
    
    * Enhanced JsonInputFormatBenchmark.
    
    Renames JsonLineReaderBenchmark to JsonInputFormatBenchmark, and enhances 
it to
    test various readers (JsonReader, JsonLineReader, JsonNodeReader) as well as
    to test with/without field discovery.
---
 benchmarks/pom.xml                                 |   5 +
 .../druid/benchmark/JsonInputFormatBenchmark.java  | 319 +++++++++++++++++++++
 .../druid/benchmark/JsonLineReaderBenchmark.java   | 174 -----------
 .../druid/indexing/input/InputRowSchemas.java      |   4 +-
 .../seekablestream/SeekableStreamSamplerSpec.java  |   3 +-
 .../seekablestream/SettableByteEntity.java         |  26 +-
 .../indexing/seekablestream/StreamChunkParser.java |   2 +-
 .../org/apache/druid/data/input/InputEntity.java   |   4 +-
 .../apache/druid/data/input/impl/ByteEntity.java   |  13 +-
 .../druid/data/input/impl/JsonNodeReader.java      |  16 +-
 .../apache/druid/data/input/impl/JsonReader.java   |  60 ++--
 11 files changed, 393 insertions(+), 233 deletions(-)

diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index c9a4768d4bd..56bd3700a58 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -65,6 +65,11 @@
       <artifactId>druid-server</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-indexing-service</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-sql</artifactId>
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/JsonInputFormatBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/JsonInputFormatBenchmark.java
new file mode 100644
index 00000000000..df8d2aa126b
--- /dev/null
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/JsonInputFormatBenchmark.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.JsonLineReader;
+import org.apache.druid.data.input.impl.JsonNodeReader;
+import org.apache.druid.data.input.impl.JsonReader;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.input.InputRowSchemas;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.RowAdapter;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+/**
+ * Tests {@link JsonInputFormat} delegates, one per {@link ReaderType}.
+ *
+ * Output is in nanoseconds per parse (or parse and read) of {@link 
#DATA_STRING}.
+ */
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@OperationsPerInvocation(JsonInputFormatBenchmark.NUM_EVENTS)
+@Warmup(iterations = 5)
+@Measurement(iterations = 10)
+@Fork(value = 1)
+public class JsonInputFormatBenchmark
+{
+  enum ReaderType
+  {
+    READER(JsonReader.class) {
+      @Override
+      public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
+      {
+        return new JsonInputFormat(flattenSpec, null, null, false, 
false).withLineSplittable(false);
+      }
+    },
+    LINE_READER(JsonLineReader.class) {
+      @Override
+      public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
+      {
+        return new JsonInputFormat(flattenSpec, null, null, null, 
null).withLineSplittable(true);
+      }
+    },
+    NODE_READER(JsonNodeReader.class) {
+      @Override
+      public JsonInputFormat createFormat(JSONPathSpec flattenSpec)
+      {
+        return new JsonInputFormat(flattenSpec, null, null, false, 
true).withLineSplittable(false);
+      }
+    };
+
+    private final Class<? extends InputEntityReader> clazz;
+
+    ReaderType(Class<? extends InputEntityReader> clazz)
+    {
+      this.clazz = clazz;
+    }
+
+    public abstract JsonInputFormat createFormat(JSONPathSpec flattenSpec);
+  }
+
+  public static final int NUM_EVENTS = 1000;
+
+  private static final String DATA_STRING =
+      "{" +
+      "\"stack\":\"mainstack\"," +
+      "\"metadata\":" +
+      "{" +
+      "\"application\":\"applicationname\"," +
+      "\"detail\":\"tm\"," +
+      
"\"id\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\","
 +
+      
"\"idtwo\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\","
 +
+      "\"sequence\":\"v008\"," +
+      "\"stack\":\"mainstack\"," +
+      "\"taskId\":\"12345678-1234-1234-1234-1234567890ab\"," +
+      "\"taskIdTwo\":\"12345678-1234-1234-1234-1234567890ab\"" +
+      "}," +
+      "\"_cluster_\":\"kafka\"," +
+      "\"_id_\":\"12345678-1234-1234-1234-1234567890ab\"," +
+      "\"_offset_\":12111398526," +
+      "\"type\":\"CUMULATIVE_DOUBLE\"," +
+      "\"version\":\"v1\"," +
+      "\"timestamp\":1670425782281," +
+      "\"point\":{\"seconds\":1670425782,\"nanos\":217000000,\"value\":0}," +
+      "\"_kafka_timestamp_\":1670425782304," +
+      "\"_partition_\":60," +
+      "\"ec2_instance_id\":\"i-1234567890\"," +
+      "\"name\":\"packets_received\"," +
+      "\"_topic_\":\"test_topic\"}";
+
+  private static final List<String> FIELDS_TO_READ =
+      ImmutableList.of(
+          "stack",
+          "_cluster_",
+          "_id_",
+          "_offset_",
+          "type",
+          "version",
+          "_kafka_timestamp_",
+          "_partition_",
+          "ec2_instance_id",
+          "name",
+          "_topic",
+          "root_type",
+          "path_app",
+          "jq_app"
+      );
+
+  ReaderType readerType;
+  InputRowSchema inputRowSchema;
+  InputEntityReader reader;
+  JsonInputFormat format;
+  List<Function<InputRow, Object>> fieldFunctions;
+  ByteEntity data;
+
+  @Param({"reader", "node_reader", "line_reader"})
+  private String readerTypeString;
+
+  /**
+   * If false: only read {@link #FIELDS_TO_READ}. If true: discover and read 
all fields.
+   */
+  @Param({"false", "true"})
+  private boolean discovery;
+
+  @Setup
+  public void setUpTrial() throws Exception
+  {
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final byte[] dataUtf8 = StringUtils.toUtf8(DATA_STRING);
+
+    for (int i = 0; i < NUM_EVENTS; i++) {
+      baos.write(dataUtf8);
+      baos.write(new byte[]{'\n'});
+    }
+
+    final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", 
null);
+    final DimensionsSpec dimensionsSpec;
+
+    if (discovery) {
+      // Discovered schema, excluding uninteresting fields that are not in 
FIELDS_TO_READ.
+      final Set<String> exclusions = Sets.difference(
+          TestHelper.makeJsonMapper()
+                    .readValue(DATA_STRING, 
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
+                    .keySet(),
+          ImmutableSet.copyOf(FIELDS_TO_READ)
+      );
+
+      dimensionsSpec = DimensionsSpec.builder()
+                                     .useSchemaDiscovery(true)
+                                     
.setDimensionExclusions(ImmutableList.copyOf(exclusions))
+                                     .build();
+    } else {
+      // Fully defined schema.
+      dimensionsSpec = DimensionsSpec.builder()
+                                     
.setDimensions(DimensionsSpec.getDefaultSchemas(FIELDS_TO_READ))
+                                     .build();
+    }
+
+    data = new ByteEntity(baos.toByteArray());
+    readerType = ReaderType.valueOf(StringUtils.toUpperCase(readerTypeString));
+    inputRowSchema = new InputRowSchema(
+        timestampSpec,
+        dimensionsSpec,
+        InputRowSchemas.createColumnsFilter(
+            timestampSpec,
+            dimensionsSpec,
+            TransformSpec.NONE,
+            new AggregatorFactory[0]
+        )
+    );
+    format = readerType.createFormat(
+        new JSONPathSpec(
+            true,
+            ImmutableList.of(
+                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_type", 
"type"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_app", 
"$.metadata.application"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_app", 
".metadata.application")
+            )
+        )
+    );
+
+    final RowAdapter<InputRow> rowAdapter = 
format.createRowAdapter(inputRowSchema);
+    fieldFunctions = new ArrayList<>(FIELDS_TO_READ.size());
+
+    for (final String field : FIELDS_TO_READ) {
+      fieldFunctions.add(rowAdapter.columnFunction(field));
+    }
+
+    reader = format.createReader(inputRowSchema, data, null);
+
+    if (reader.getClass() != readerType.clazz) {
+      throw new ISE(
+          "Expected class[%s] for readerType[%s], got[%s]",
+          readerType.clazz,
+          readerTypeString,
+          reader.getClass()
+      );
+    }
+  }
+
+  /**
+   * Benchmark parsing, but not reading fields.
+   */
+  @Benchmark
+  public void parse(final Blackhole blackhole) throws IOException
+  {
+    data.getBuffer().rewind();
+
+    int counted = 0;
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      while (iterator.hasNext()) {
+        final InputRow row = iterator.next();
+        if (row != null) {
+          counted += 1;
+          blackhole.consume(row);
+        }
+      }
+    }
+
+    if (counted != NUM_EVENTS) {
+      throw new RuntimeException("invalid number of loops, counted = " + 
counted);
+    }
+  }
+
+  /**
+   * Benchmark parsing and reading {@link #FIELDS_TO_READ}. More realistic 
than {@link #parse(Blackhole)}.
+   */
+  @Benchmark
+  public void parseAndRead(final Blackhole blackhole) throws IOException
+  {
+    data.getBuffer().rewind();
+
+    int counted = 0;
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      while (iterator.hasNext()) {
+        final InputRow row = iterator.next();
+        if (row != null) {
+          for (Function<InputRow, Object> fieldFunction : fieldFunctions) {
+            blackhole.consume(fieldFunction.apply(row));
+          }
+
+          counted += 1;
+        }
+      }
+    }
+
+    if (counted != NUM_EVENTS) {
+      throw new RuntimeException("invalid number of loops, counted = " + 
counted);
+    }
+  }
+
+  public static void main(String[] args) throws RunnerException
+  {
+    Options opt = new OptionsBuilder()
+        .include(JsonInputFormatBenchmark.class.getSimpleName())
+        .build();
+
+    new Runner(opt).run();
+  }
+}
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/JsonLineReaderBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/JsonLineReaderBenchmark.java
deleted file mode 100644
index 44b3054ccbf..00000000000
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/JsonLineReaderBenchmark.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.benchmark;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.ColumnsFilter;
-import org.apache.druid.data.input.InputEntityReader;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.InputRowSchema;
-import org.apache.druid.data.input.impl.ByteEntity;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.JsonInputFormat;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
-import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
-import org.apache.druid.java.util.common.parsers.JSONPathSpec;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.Warmup;
-import org.openjdk.jmh.infra.Blackhole;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-@State(Scope.Benchmark)
-@BenchmarkMode(Mode.AverageTime)
-@Warmup(iterations = 10)
-@Measurement(iterations = 25)
-@Fork(value = 1)
-public class JsonLineReaderBenchmark
-{
-  private static final int NUM_EVENTS = 1000;
-
-  InputEntityReader reader;
-  JsonInputFormat format;
-  byte[] data;
-
-  @Setup(Level.Invocation)
-  public void prepareReader()
-  {
-    ByteEntity source = new ByteEntity(data);
-    reader = format.createReader(
-            new InputRowSchema(
-                    new TimestampSpec("timestamp", "iso", null),
-                    new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", 
"foo"))),
-                    ColumnsFilter.all()
-            ),
-            source,
-            null
-    );
-  }
-
-  @Setup
-  public void prepareData() throws Exception
-  {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-    String dataString = "{" +
-            "\"stack\":\"mainstack\"," +
-            "\"metadata\":" +
-            "{" +
-            "\"application\":\"applicationname\"," +
-            "\"detail\":\"tm\"," +
-            
"\"id\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\","
 +
-            
"\"idtwo\":\"123456789012345678901234567890346973eb4c30eca8a4df79c8219d152cfe0d7d6bdb11a12e609c0c\","
 +
-            "\"sequence\":\"v008\"," +
-            "\"stack\":\"mainstack\"," +
-            "\"taskId\":\"12345678-1234-1234-1234-1234567890ab\"," +
-            "\"taskIdTwo\":\"12345678-1234-1234-1234-1234567890ab\"" +
-            "}," +
-            "\"_cluster_\":\"kafka\"," +
-            "\"_id_\":\"12345678-1234-1234-1234-1234567890ab\"," +
-            "\"_offset_\":12111398526," +
-            "\"type\":\"CUMULATIVE_DOUBLE\"," +
-            "\"version\":\"v1\"," +
-            "\"timestamp\":1670425782281," +
-            
"\"point\":{\"seconds\":1670425782,\"nanos\":217000000,\"value\":0}," +
-            "\"_kafka_timestamp_\":1670425782304," +
-            "\"_partition_\":60," +
-            "\"ec2_instance_id\":\"i-1234567890\"," +
-            "\"name\":\"packets_received\"," +
-            "\"_topic_\":\"test_topic\"}";
-    for (int i = 0; i < NUM_EVENTS; i++) {
-      baos.write(StringUtils.toUtf8(dataString));
-      baos.write(new byte[]{'\n'});
-    }
-
-    data = baos.toByteArray();
-  }
-
-  @Setup
-  public void prepareFormat()
-  {
-    format = new JsonInputFormat(
-            new JSONPathSpec(
-                    true,
-                    ImmutableList.of(
-                            new JSONPathFieldSpec(JSONPathFieldType.ROOT, 
"root_baz", "baz"),
-                            new JSONPathFieldSpec(JSONPathFieldType.ROOT, 
"root_baz2", "baz2"),
-                            new JSONPathFieldSpec(JSONPathFieldType.PATH, 
"path_omg", "$.o.mg"),
-                            new JSONPathFieldSpec(JSONPathFieldType.PATH, 
"path_omg2", "$.o.mg2"),
-                            new JSONPathFieldSpec(JSONPathFieldType.JQ, 
"jq_omg", ".o.mg"),
-                            new JSONPathFieldSpec(JSONPathFieldType.JQ, 
"jq_omg2", ".o.mg2")
-                    )
-            ),
-            null,
-            null,
-            null,
-            null
-    );
-  }
-
-  @Benchmark
-  @BenchmarkMode(Mode.AverageTime)
-  @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void baseline(final Blackhole blackhole) throws IOException
-  {
-    int counted = 0;
-    try (CloseableIterator<InputRow> iterator = reader.read()) {
-      while (iterator.hasNext()) {
-        final InputRow row = iterator.next();
-        if (row != null) {
-          counted += 1;
-        }
-        blackhole.consume(row);
-      }
-    }
-
-    if (counted != NUM_EVENTS) {
-      throw new RuntimeException("invalid number of loops, counted = " + 
counted);
-    }
-  }
-
-  public static void main(String[] args) throws RunnerException
-  {
-    Options opt = new OptionsBuilder()
-        .include(JsonLineReaderBenchmark.class.getSimpleName())
-        .build();
-
-    new Runner(opt).run();
-  }
-}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
index c895eb14b71..3a04b71e2f1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.input;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -69,8 +68,7 @@ public class InputRowSchemas
    *
    * @see InputRowSchema#getColumnsFilter()
    */
-  @VisibleForTesting
-  static ColumnsFilter createColumnsFilter(
+  public static ColumnsFilter createColumnsFilter(
       final TimestampSpec timestampSpec,
       final DimensionsSpec dimensionsSpec,
       final TransformSpec transformSpec,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index af850dd1050..4705f333722 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -236,7 +236,8 @@ public abstract class 
SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
         @Override
         public InputRowListPlusRawValues next()
         {
-          final ByteBuffer bb = ((ByteEntity) 
entityIterator.next()).getBuffer();
+          // We need to modify the position of the buffer, so duplicate it.
+          final ByteBuffer bb = ((ByteEntity) 
entityIterator.next()).getBuffer().duplicate();
 
           final Map<String, Object> rawColumns;
           try {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
index 6fe347691fe..bac59d4b3f0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntity.java
@@ -37,27 +37,17 @@ import java.nio.ByteBuffer;
  * processing where binary records are arriving as a list but {@link 
org.apache.druid.data.input.InputEntityReader}, that
  * parses the data, expects an {@link InputStream}. This class mimics a 
continuous InputStream while behind the scenes,
  * binary records are being put one after the other that the InputStream 
consumes bytes from. One record is fully
- * consumed and only then the next record is set. This class doesn't allow 
reading the same data twice.
+ * consumed and only then the next record is set.
  * This class solely exists to overcome the limitations imposed by interfaces 
for reading and parsing data.
- *
  */
 @NotThreadSafe
 public class SettableByteEntity<T extends ByteEntity> implements InputEntity
 {
-  private final SettableByteBufferInputStream inputStream;
-  private boolean opened = false;
   private T entity;
 
-  public SettableByteEntity()
-  {
-    this.inputStream = new SettableByteBufferInputStream();
-  }
-
   public void setEntity(T entity)
   {
-    inputStream.setBuffer(entity.getBuffer());
     this.entity = entity;
-    opened = false;
   }
 
   @Nullable
@@ -72,19 +62,13 @@ public class SettableByteEntity<T extends ByteEntity> 
implements InputEntity
     return entity;
   }
 
-  /**
-   * This method can be called multiple times only for different data. So you 
can open a new input stream
-   * only after a new buffer is in use.
-   */
   @Override
   public InputStream open()
   {
-    if (opened) {
-      throw new IllegalArgumentException("Can't open the input stream on 
SettableByteEntity more than once");
-    }
-
-    opened = true;
-    return inputStream;
+    // Duplicate the entity buffer, because the stream will update its 
position.
+    final SettableByteBufferInputStream stream = new 
SettableByteBufferInputStream();
+    stream.setBuffer(entity.getBuffer().duplicate());
+    return stream;
   }
 
   public static final class SettableByteBufferInputStream extends InputStream
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index 2ee14917e33..3ac952c16c2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -133,7 +133,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
   {
     final FluentIterable<InputRow> iterable = FluentIterable
         .from(valueBytes)
-        .transform(ByteEntity::getBuffer)
+        .transform(entity -> entity.getBuffer().duplicate() /* Parsing may 
need to modify buffer position */)
         .transform(this::incrementProcessedBytes)
         .transformAndConcat(parser::parseBatch);
 
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/InputEntity.java 
b/processing/src/main/java/org/apache/druid/data/input/InputEntity.java
index 6765ae82717..d6ea7211573 100644
--- a/processing/src/main/java/org/apache/druid/data/input/InputEntity.java
+++ b/processing/src/main/java/org/apache/druid/data/input/InputEntity.java
@@ -65,9 +65,7 @@ public interface InputEntity
   /**
    * Opens an {@link InputStream} on the input entity directly.
    * This is the basic way to read the given entity.
-   *
-   * The behavior of this method is only defined fort the first call to open().
-   * The behavior of subsequent calls is undefined and may vary between 
implementations.
+   * This method may be called multiple times to re-read the data from the 
entity.
    *
    * @see #fetch
    */
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java 
b/processing/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java
index 13376912b9a..db4f3396ae4 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/ByteEntity.java
@@ -31,9 +31,13 @@ public class ByteEntity implements InputEntity
 {
   private final ByteBuffer buffer;
 
+  /**
+   * Create a new entity. The buffer is not duplicated, so it is important to 
ensure that its position and limit
+   * are not modified after this entity is created.
+   */
   public ByteEntity(ByteBuffer buffer)
   {
-    this.buffer = buffer.duplicate();
+    this.buffer = buffer;
   }
 
   public ByteEntity(byte[] bytes)
@@ -41,6 +45,11 @@ public class ByteEntity implements InputEntity
     this(ByteBuffer.wrap(bytes));
   }
 
+  /**
+   * Return the buffer backing this entity. Calling code must not modify the 
mark, position, or limit of this buffer.
+   * If you need to modify them, call {@link ByteBuffer#duplicate()} or {@link 
ByteBuffer#asReadOnlyBuffer()} and
+   * modify the copy.
+   */
   public ByteBuffer getBuffer()
   {
     return buffer;
@@ -56,6 +65,6 @@ public class ByteEntity implements InputEntity
   @Override
   public InputStream open()
   {
-    return new ByteBufferInputStream(buffer);
+    return new ByteBufferInputStream(buffer.duplicate());
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java 
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
index b5a61f69292..d45a34e8b81 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
@@ -33,7 +33,6 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.IntermediateRowParsingReader;
 import org.apache.druid.java.util.common.CloseableIterators;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -43,6 +42,7 @@ import 
org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.utils.CollectionUtils;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -94,10 +94,8 @@ public class JsonNodeReader extends 
IntermediateRowParsingReader<JsonNode>
   @Override
   protected CloseableIterator<JsonNode> intermediateRowIterator() throws 
IOException
   {
-    final String sourceString = IOUtils.toString(source.open(), 
StringUtils.UTF8_STRING);
     final List<JsonNode> jsonNodes = new ArrayList<>();
-    try {
-      JsonParser parser = jsonFactory.createParser(sourceString);
+    try (final JsonParser parser = jsonFactory.createParser(source.open())) {
       final MappingIterator<JsonNode> delegate = mapper.readValues(parser, 
JsonNode.class);
       while (delegate.hasNext()) {
         jsonNodes.add(delegate.next());
@@ -107,9 +105,10 @@ public class JsonNodeReader extends 
IntermediateRowParsingReader<JsonNode>
       //convert Jackson's JsonParseException into druid's exception for 
further processing
       //JsonParseException will be thrown from MappingIterator#hasNext or 
MappingIterator#next when input json text is ill-formed
       if (e.getCause() instanceof JsonParseException) {
+        final String rowAsString = IOUtils.toString(source.open(), 
StandardCharsets.UTF_8);
         jsonNodes.add(
             new ParseExceptionMarkerJsonNode(
-                new ParseException(sourceString, e, "Unable to parse row 
[%s]", sourceString)
+                new ParseException(rowAsString, e, "Unable to parse row [%s]", 
rowAsString)
             )
         );
       } else {
@@ -117,13 +116,14 @@ public class JsonNodeReader extends 
IntermediateRowParsingReader<JsonNode>
       }
     }
 
-    if (CollectionUtils.isNullOrEmpty(jsonNodes)) {
+    if (jsonNodes.isEmpty()) {
+      final String rowAsString = IOUtils.toString(source.open(), 
StandardCharsets.UTF_8);
       jsonNodes.add(
           new ParseExceptionMarkerJsonNode(
               new ParseException(
-                  sourceString,
+                  rowAsString,
                   "Unable to parse [%s] as the intermediateRow resulted in 
empty input row",
-                  sourceString
+                  rowAsString
               )
           )
       );
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java 
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
index 69f518aecad..3a0fcf45ab8 100644
--- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
@@ -33,16 +33,17 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.IntermediateRowParsingReader;
 import org.apache.druid.java.util.common.CloseableIterators;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
 import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.CollectionUtils;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -59,7 +60,7 @@ import java.util.Map;
  *
  * For more information, see: https://github.com/apache/druid/pull/10383
  */
-public class JsonReader extends IntermediateRowParsingReader<String>
+public class JsonReader extends IntermediateRowParsingReader<InputEntity>
 {
   private final ObjectFlattener<JsonNode> flattener;
   private final ObjectMapper mapper;
@@ -89,11 +90,9 @@ public class JsonReader extends 
IntermediateRowParsingReader<String>
   }
 
   @Override
-  protected CloseableIterator<String> intermediateRowIterator() throws 
IOException
+  protected CloseableIterator<InputEntity> intermediateRowIterator()
   {
-    return CloseableIterators.withEmptyBaggage(
-        Iterators.singletonIterator(IOUtils.toString(source.open(), 
StringUtils.UTF8_STRING))
-    );
+    return 
CloseableIterators.withEmptyBaggage(Iterators.singletonIterator(source));
   }
 
   @Override
@@ -103,39 +102,59 @@ public class JsonReader extends 
IntermediateRowParsingReader<String>
   }
 
   @Override
-  protected List<InputRow> parseInputRows(String intermediateRow) throws 
IOException, ParseException
+  protected String intermediateRowAsString(@Nullable InputEntity entity)
   {
-    final List<InputRow> inputRows;
-    try (JsonParser parser = jsonFactory.createParser(intermediateRow)) {
+    if (entity == null) {
+      return "null";
+    } else {
+      try {
+        return IOUtils.toString(entity.open(), StandardCharsets.UTF_8);
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  protected List<InputRow> parseInputRows(InputEntity entity) throws 
IOException, ParseException
+  {
+    final List<InputRow> inputRows = new ArrayList<>();
+    try (JsonParser parser = jsonFactory.createParser(entity.open())) {
       final MappingIterator<JsonNode> delegate = mapper.readValues(parser, 
JsonNode.class);
-      inputRows = FluentIterable.from(() -> delegate)
-                                .transform(jsonNode -> 
MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
-                                .toList();
+      while (delegate.hasNext()) {
+        final JsonNode row = delegate.next();
+        inputRows.add(MapInputRowParser.parse(inputRowSchema, 
flattener.flatten(row)));
+      }
     }
     catch (RuntimeException e) {
       //convert Jackson's JsonParseException into druid's exception for 
further processing
       //JsonParseException will be thrown from MappingIterator#hasNext or 
MappingIterator#next when input json text is ill-formed
       if (e.getCause() instanceof JsonParseException) {
-        throw new ParseException(intermediateRow, e, "Unable to parse row 
[%s]", intermediateRow);
+        final String rowAsString = IOUtils.toString(entity.open(), 
StandardCharsets.UTF_8);
+        throw new ParseException(rowAsString, e, "Unable to parse row [%s]", 
rowAsString);
       }
 
       //throw unknown exception
       throw e;
     }
-    if (CollectionUtils.isNullOrEmpty(inputRows)) {
+
+    if (inputRows.isEmpty()) {
+      final String rowAsString = IOUtils.toString(entity.open(), 
StandardCharsets.UTF_8);
       throw new ParseException(
-          intermediateRow,
+          rowAsString,
           "Unable to parse [%s] as the intermediateRow resulted in empty input 
row",
-          intermediateRow
+          rowAsString
       );
     }
+
     return inputRows;
   }
 
   @Override
-  protected List<Map<String, Object>> toMap(String intermediateRow) throws 
IOException
+  protected List<Map<String, Object>> toMap(InputEntity entity) throws 
IOException
   {
-    try (JsonParser parser = jsonFactory.createParser(intermediateRow)) {
+    try (JsonParser parser = jsonFactory.createParser(entity.open())) {
       final MappingIterator<Map> delegate = mapper.readValues(parser, 
Map.class);
       return FluentIterable.from(() -> delegate)
                            .transform(map -> (Map<String, Object>) map)
@@ -145,7 +164,8 @@ public class JsonReader extends 
IntermediateRowParsingReader<String>
       //convert Jackson's JsonParseException into druid's exception for 
further processing
       //JsonParseException will be thrown from MappingIterator#hasNext or 
MappingIterator#next when input json text is ill-formed
       if (e.getCause() instanceof JsonParseException) {
-        throw new ParseException(intermediateRow, e, "Unable to parse row 
[%s]", intermediateRow);
+        final String rowAsString = IOUtils.toString(entity.open(), 
StandardCharsets.UTF_8);
+        throw new ParseException(rowAsString, e, "Unable to parse row [%s]", 
rowAsString);
       }
 
       //throw unknown exception


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to