This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef975540d7 Add "lines" input format. (#18433)
6ef975540d7 is described below

commit 6ef975540d7f9cb98303b9f4b50547e42c5a1101
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 26 16:10:58 2025 -0700

    Add "lines" input format. (#18433)
    
    * Add "lines" input format.
    
    This is useful for reading entire lines for later processing. For example,
    it can be used to read newline-delimited JSON into a single nested column,
    by combining this input format with the TRY_PARSE_JSON function.
    
    * Fix style.
---
 docs/ingestion/data-formats.md                     |  22 ++++
 .../org/apache/druid/data/input/InputFormat.java   |   2 +
 .../druid/data/input/impl/FlatTextInputFormat.java |   6 +-
 .../druid/data/input/impl/JsonInputFormat.java     |   6 +-
 .../druid/data/input/impl/LinesInputFormat.java    |  60 +++++++++++
 .../apache/druid/data/input/impl/LinesReader.java  |  80 ++++++++++++++
 .../druid/data/input/impl/RegexInputFormat.java    |   6 +-
 .../org/apache/druid/utils/CompressionUtils.java   |  18 +++-
 .../data/input/impl/LinesInputFormatTest.java      |  79 ++++++++++++++
 .../druid/data/input/impl/LinesReaderTest.java     | 116 +++++++++++++++++++++
 10 files changed, 379 insertions(+), 16 deletions(-)

diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index a595b1c6ded..b36ab87d654 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -168,6 +168,28 @@ For example:
 }
 ```
 
+### Lines
+
+Configure the Lines `inputFormat` to load line-oriented data where each line 
is treated as a single field:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | Set value to `lines`. | yes |
+
+The Lines input format reads each line from the input as UTF-8 text, and 
creates a single column named `line` containing the entire line as a string.
+This is useful for reading line-oriented data in a simple form for later 
processing.
+
+For example:
+
+```json
+"ioConfig": {
+  "inputFormat": {
+    "type": "lines"
+  },
+  ...
+}
+```
+
 ### ORC
 
 To use the ORC input format, load the Druid Orc extension ( 
[`druid-orc-extensions`](../development/extensions-core/orc.md)).
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java 
b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
index ebf2dea6687..0103ea66c9c 100644
--- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
+++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DelimitedInputFormat;
 import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.LinesInputFormat;
 import org.apache.druid.data.input.impl.NestedInputFormat;
 import org.apache.druid.data.input.impl.RegexInputFormat;
 import org.apache.druid.data.input.impl.SplittableInputSource;
@@ -48,6 +49,7 @@ import java.io.File;
 @JsonSubTypes(value = {
     @Type(name = CsvInputFormat.TYPE_KEY, value = CsvInputFormat.class),
     @Type(name = JsonInputFormat.TYPE_KEY, value = JsonInputFormat.class),
+    @Type(name = LinesInputFormat.TYPE_KEY, value = LinesInputFormat.class),
     @Type(name = RegexInputFormat.TYPE_KEY, value = RegexInputFormat.class),
     @Type(name = DelimitedInputFormat.TYPE_KEY, value = 
DelimitedInputFormat.class)
 })
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
index 39086b6838e..f046a894f40 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
@@ -145,11 +145,7 @@ public abstract class FlatTextInputFormat implements 
InputFormat
   @Override
   public long getWeightedSize(String path, long size)
   {
-    CompressionUtils.Format compressionFormat = 
CompressionUtils.Format.fromFileName(path);
-    if (CompressionUtils.Format.GZ == compressionFormat) {
-      return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
-    }
-    return size;
+    return size * 
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
index 1155bd23e30..a605fd7a8ae 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
@@ -174,11 +174,7 @@ public class JsonInputFormat extends NestedInputFormat
   @Override
   public long getWeightedSize(String path, long size)
   {
-    CompressionUtils.Format compressionFormat = 
CompressionUtils.Format.fromFileName(path);
-    if (CompressionUtils.Format.GZ == compressionFormat) {
-      return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
-    }
-    return size;
+    return size * 
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java
new file mode 100644
index 00000000000..1d47b71992b
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.utils.CompressionUtils;
+
+import java.io.File;
+
+/**
+ * Input format that breaks the input on newlines, and returns a single column 
named {@link LinesReader#LINE_COLUMN}.
+ */
+public class LinesInputFormat implements InputFormat
+{
+  public static final String TYPE_KEY = "lines";
+
+  @JsonCreator
+  public LinesInputFormat()
+  {
+  }
+
+  @Override
+  public boolean isSplittable()
+  {
+    return false;
+  }
+
+  @Override
+  public InputEntityReader createReader(InputRowSchema inputRowSchema, 
InputEntity source, File temporaryDirectory)
+  {
+    return new LinesReader(inputRowSchema, source);
+  }
+
+  @Override
+  public long getWeightedSize(String path, long size)
+  {
+    return size * 
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java 
b/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java
new file mode 100644
index 00000000000..57f32599e13
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.TextReader;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Reader for {@link LinesInputFormat}.
+ */
+public class LinesReader extends TextReader.Strings
+{
+  private static final String LINE_COLUMN = "line";
+
+  LinesReader(
+      InputRowSchema inputRowSchema,
+      InputEntity source
+  )
+  {
+    super(inputRowSchema, source);
+  }
+
+  @Override
+  public List<InputRow> parseInputRows(String intermediateRow) throws 
ParseException
+  {
+    return List.of(MapInputRowParser.parse(getInputRowSchema(), 
parseLine(intermediateRow)));
+  }
+
+  @Override
+  protected List<Map<String, Object>> toMap(String intermediateRow)
+  {
+    return List.of(parseLine(intermediateRow));
+  }
+
+  private Map<String, Object> parseLine(String line)
+  {
+    return Map.of(LINE_COLUMN, line);
+  }
+
+  @Override
+  public int getNumHeaderLinesToSkip()
+  {
+    return 0;
+  }
+
+  @Override
+  public boolean needsToProcessHeaderLine()
+  {
+    return false;
+  }
+
+  @Override
+  public void processHeaderLine(String line)
+  {
+    // Nothing to do.
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
index a6b823ae891..a3057fa212b 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
@@ -96,10 +96,6 @@ public class RegexInputFormat implements InputFormat
   @Override
   public long getWeightedSize(String path, long size)
   {
-    CompressionUtils.Format compressionFormat = 
CompressionUtils.Format.fromFileName(path);
-    if (CompressionUtils.Format.GZ == compressionFormat) {
-      return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
-    }
-    return size;
+    return size * 
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java 
b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
index 7ab05b90719..01b45c5089a 100644
--- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
+++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
@@ -32,6 +32,7 @@ import 
org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInpu
 import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
 import 
org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
 import org.apache.commons.compress.utils.FileNameUtils;
+import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.IAE;
@@ -120,7 +121,7 @@ public class CompressionUtils
     }
   }
 
-  public static final long COMPRESSED_TEXT_WEIGHT_FACTOR = 4L;
+  public static final int COMPRESSED_TEXT_WEIGHT_FACTOR = 4;
   private static final Logger log = new Logger(CompressionUtils.class);
   private static final int DEFAULT_RETRY_COUNT = 3;
   private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512
@@ -655,4 +656,19 @@ public class CompressionUtils
       return in;
     }
   }
+
+  /**
+   * Returns the estimated compression factor for a given format on plain 
text. This is used by certain implementations
+   * of {@link InputFormat#getWeightedSize(String, long)}.
+   */
+  public static int estimatedCompressionFactor(@Nullable final Format format)
+  {
+    // The check for GZ specifically was originally added in 
https://github.com/apache/druid/pull/14307, and later
+    // moved here. It may make sense to expand this to other (all?) 
compression formats.
+    if (format == Format.GZ) {
+      return COMPRESSED_TEXT_WEIGHT_FACTOR;
+    } else {
+      return 1;
+    }
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java
new file mode 100644
index 00000000000..e2b132a37e4
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.utils.CompressionUtils;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class LinesInputFormatTest
+{
+  private ObjectMapper mapper;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    mapper = TestHelper.makeJsonMapper();
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    final LinesInputFormat expected = new LinesInputFormat();
+
+    final byte[] json = mapper.writeValueAsBytes(expected);
+
+    // Read as map
+    final Map<String, Object> map = mapper.readValue(json, Map.class);
+    Assert.assertEquals("lines", map.get("type"));
+
+    // Read as InputFormat
+    final InputFormat fromJson = mapper.readValue(json, InputFormat.class);
+    MatcherAssert.assertThat(fromJson, 
Matchers.instanceOf(LinesInputFormat.class));
+  }
+
+  @Test
+  public void test_getWeightedSize_withoutCompression()
+  {
+    final LinesInputFormat format = new LinesInputFormat();
+    final long unweightedSize = 100L;
+    Assert.assertEquals(unweightedSize, format.getWeightedSize("file.txt", 
unweightedSize));
+  }
+
+  @Test
+  public void test_getWeightedSize_withGzCompression()
+  {
+    final LinesInputFormat format = new LinesInputFormat();
+    final long unweightedSize = 100L;
+    Assert.assertEquals(
+        unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR,
+        format.getWeightedSize("file.txt.gz", unweightedSize)
+    );
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java
new file mode 100644
index 00000000000..faf7d632c0a
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class LinesReaderTest extends InitializedNullHandlingTest
+{
+  @Test
+  public void testSimpleLineParsing() throws IOException
+  {
+    final String input = "line1\nline2\nline3";
+    final InputEntityReader reader = createReader(input, 
Collections.emptyList());
+
+    final List<InputRow> inputRows = readAllRows(reader);
+
+    Assert.assertEquals(3, inputRows.size());
+    Assert.assertEquals("line1", inputRows.get(0).getRaw("line"));
+    Assert.assertEquals("line2", inputRows.get(1).getRaw("line"));
+    Assert.assertEquals("line3", inputRows.get(2).getRaw("line"));
+  }
+
+  @Test
+  public void testEmptyLines() throws IOException
+  {
+    final String input = "line1\n\n line3\n";
+    final InputEntityReader reader = createReader(input, 
Collections.emptyList());
+
+    final List<InputRow> inputRows = readAllRows(reader);
+
+    Assert.assertEquals(3, inputRows.size());
+    Assert.assertEquals("line1", inputRows.get(0).getRaw("line"));
+    Assert.assertEquals("", inputRows.get(1).getRaw("line"));
+    Assert.assertEquals(" line3", inputRows.get(2).getRaw("line"));
+  }
+
+  @Test
+  public void testSingleLine() throws IOException
+  {
+    final String input = "single line without newline";
+    final InputEntityReader reader = createReader(input, 
Collections.emptyList());
+
+    final List<InputRow> inputRows = readAllRows(reader);
+
+    Assert.assertEquals(1, inputRows.size());
+    Assert.assertEquals("single line without newline", 
inputRows.get(0).getRaw("line"));
+  }
+
+  @Test
+  public void testToMap()
+  {
+    final String input = "test line";
+    final LinesReader reader = (LinesReader) createReader(input, 
Collections.emptyList());
+
+    final List<Map<String, Object>> maps = reader.toMap("test line");
+
+    Assert.assertEquals(1, maps.size());
+    Assert.assertEquals(ImmutableMap.of("line", "test line"), maps.get(0));
+  }
+
+  private InputEntityReader createReader(
+      String input,
+      List<String> columns
+  )
+  {
+    final InputRowSchema inputRowSchema = new InputRowSchema(
+        new TimestampSpec("__time", "auto", 
DateTimes.of("2000-01-01T00:00:00.000Z")),
+        new DimensionsSpec(DimensionsSpec.getDefaultSchemas(columns)),
+        ColumnsFilter.all()
+    );
+
+    final ByteEntity entity = new ByteEntity(StringUtils.toUtf8(input));
+
+    return new LinesReader(inputRowSchema, entity);
+  }
+
+  private List<InputRow> readAllRows(InputEntityReader reader) throws 
IOException
+  {
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      return Lists.newArrayList(iterator);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to