This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef975540d7 Add "lines" input format. (#18433)
6ef975540d7 is described below
commit 6ef975540d7f9cb98303b9f4b50547e42c5a1101
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Aug 26 16:10:58 2025 -0700
Add "lines" input format. (#18433)
* Add "lines" input format.
This is useful for reading entire lines for later processing. For example,
it can be used to read newline-delimited JSON into a single nested column,
by combining this input format with the TRY_PARSE_JSON function.
* Fix style.
---
docs/ingestion/data-formats.md | 22 ++++
.../org/apache/druid/data/input/InputFormat.java | 2 +
.../druid/data/input/impl/FlatTextInputFormat.java | 6 +-
.../druid/data/input/impl/JsonInputFormat.java | 6 +-
.../druid/data/input/impl/LinesInputFormat.java | 60 +++++++++++
.../apache/druid/data/input/impl/LinesReader.java | 80 ++++++++++++++
.../druid/data/input/impl/RegexInputFormat.java | 6 +-
.../org/apache/druid/utils/CompressionUtils.java | 18 +++-
.../data/input/impl/LinesInputFormatTest.java | 79 ++++++++++++++
.../druid/data/input/impl/LinesReaderTest.java | 116 +++++++++++++++++++++
10 files changed, 379 insertions(+), 16 deletions(-)
diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index a595b1c6ded..b36ab87d654 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -168,6 +168,28 @@ For example:
}
```
+### Lines
+
+Configure the Lines `inputFormat` to load line-oriented data where each line
is treated as a single field:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | Set value to `lines`. | yes |
+
+The Lines input format reads each line from the input as UTF-8 text, and
creates a single column named `line` containing the entire line as a string.
+This is useful for reading line-oriented data in a simple form for later
processing.
+
+For example:
+
+```json
+"ioConfig": {
+ "inputFormat": {
+ "type": "lines"
+ },
+ ...
+}
+```
+
### ORC
To use the ORC input format, load the Druid Orc extension (
[`druid-orc-extensions`](../development/extensions-core/orc.md)).
diff --git
a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
index ebf2dea6687..0103ea66c9c 100644
--- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
+++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DelimitedInputFormat;
import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.LinesInputFormat;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
@@ -48,6 +49,7 @@ import java.io.File;
@JsonSubTypes(value = {
@Type(name = CsvInputFormat.TYPE_KEY, value = CsvInputFormat.class),
@Type(name = JsonInputFormat.TYPE_KEY, value = JsonInputFormat.class),
+ @Type(name = LinesInputFormat.TYPE_KEY, value = LinesInputFormat.class),
@Type(name = RegexInputFormat.TYPE_KEY, value = RegexInputFormat.class),
@Type(name = DelimitedInputFormat.TYPE_KEY, value =
DelimitedInputFormat.class)
})
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
index 39086b6838e..f046a894f40 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java
@@ -145,11 +145,7 @@ public abstract class FlatTextInputFormat implements
InputFormat
@Override
public long getWeightedSize(String path, long size)
{
- CompressionUtils.Format compressionFormat =
CompressionUtils.Format.fromFileName(path);
- if (CompressionUtils.Format.GZ == compressionFormat) {
- return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
- }
- return size;
+ return size *
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
index 1155bd23e30..a605fd7a8ae 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
@@ -174,11 +174,7 @@ public class JsonInputFormat extends NestedInputFormat
@Override
public long getWeightedSize(String path, long size)
{
- CompressionUtils.Format compressionFormat =
CompressionUtils.Format.fromFileName(path);
- if (CompressionUtils.Format.GZ == compressionFormat) {
- return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
- }
- return size;
+ return size *
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
}
/**
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java
b/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java
new file mode 100644
index 00000000000..1d47b71992b
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.utils.CompressionUtils;
+
+import java.io.File;
+
+/**
+ * Input format that breaks the input on newlines, and returns a single column
named {@link LinesReader#LINE_COLUMN}.
+ */
+public class LinesInputFormat implements InputFormat
+{
+ public static final String TYPE_KEY = "lines";
+
+ @JsonCreator
+ public LinesInputFormat()
+ {
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputEntityReader createReader(InputRowSchema inputRowSchema,
InputEntity source, File temporaryDirectory)
+ {
+ return new LinesReader(inputRowSchema, source);
+ }
+
+ @Override
+ public long getWeightedSize(String path, long size)
+ {
+ return size *
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java
b/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java
new file mode 100644
index 00000000000..57f32599e13
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.TextReader;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Reader for {@link LinesInputFormat}.
+ */
+public class LinesReader extends TextReader.Strings
+{
+ private static final String LINE_COLUMN = "line";
+
+ LinesReader(
+ InputRowSchema inputRowSchema,
+ InputEntity source
+ )
+ {
+ super(inputRowSchema, source);
+ }
+
+ @Override
+ public List<InputRow> parseInputRows(String intermediateRow) throws
ParseException
+ {
+ return List.of(MapInputRowParser.parse(getInputRowSchema(),
parseLine(intermediateRow)));
+ }
+
+ @Override
+ protected List<Map<String, Object>> toMap(String intermediateRow)
+ {
+ return List.of(parseLine(intermediateRow));
+ }
+
+ private Map<String, Object> parseLine(String line)
+ {
+ return Map.of(LINE_COLUMN, line);
+ }
+
+ @Override
+ public int getNumHeaderLinesToSkip()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean needsToProcessHeaderLine()
+ {
+ return false;
+ }
+
+ @Override
+ public void processHeaderLine(String line)
+ {
+ // Nothing to do.
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
index a6b823ae891..a3057fa212b 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
@@ -96,10 +96,6 @@ public class RegexInputFormat implements InputFormat
@Override
public long getWeightedSize(String path, long size)
{
- CompressionUtils.Format compressionFormat =
CompressionUtils.Format.fromFileName(path);
- if (CompressionUtils.Format.GZ == compressionFormat) {
- return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
- }
- return size;
+ return size *
CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
}
}
diff --git
a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
index 7ab05b90719..01b45c5089a 100644
--- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
+++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java
@@ -32,6 +32,7 @@ import
org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInpu
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import
org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.utils.FileNameUtils;
+import org.apache.druid.data.input.InputFormat;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
@@ -120,7 +121,7 @@ public class CompressionUtils
}
}
- public static final long COMPRESSED_TEXT_WEIGHT_FACTOR = 4L;
+ public static final int COMPRESSED_TEXT_WEIGHT_FACTOR = 4;
private static final Logger log = new Logger(CompressionUtils.class);
private static final int DEFAULT_RETRY_COUNT = 3;
private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512
@@ -655,4 +656,19 @@ public class CompressionUtils
return in;
}
}
+
+ /**
+ * Returns the estimated compression factor for a given format on plain
text. This is used by certain implementations
+ * of {@link InputFormat#getWeightedSize(String, long)}.
+ */
+ public static int estimatedCompressionFactor(@Nullable final Format format)
+ {
+ // The check for GZ specifically was originally added in
https://github.com/apache/druid/pull/14307, and later
+ // moved here. It may make sense to expand this to other (all?)
compression formats.
+ if (format == Format.GZ) {
+ return COMPRESSED_TEXT_WEIGHT_FACTOR;
+ } else {
+ return 1;
+ }
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java
new file mode 100644
index 00000000000..e2b132a37e4
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.utils.CompressionUtils;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class LinesInputFormatTest
+{
+ private ObjectMapper mapper;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ mapper = TestHelper.makeJsonMapper();
+ }
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final LinesInputFormat expected = new LinesInputFormat();
+
+ final byte[] json = mapper.writeValueAsBytes(expected);
+
+ // Read as map
+ final Map<String, Object> map = mapper.readValue(json, Map.class);
+ Assert.assertEquals("lines", map.get("type"));
+
+ // Read as InputFormat
+ final InputFormat fromJson = mapper.readValue(json, InputFormat.class);
+ MatcherAssert.assertThat(fromJson,
Matchers.instanceOf(LinesInputFormat.class));
+ }
+
+ @Test
+ public void test_getWeightedSize_withoutCompression()
+ {
+ final LinesInputFormat format = new LinesInputFormat();
+ final long unweightedSize = 100L;
+ Assert.assertEquals(unweightedSize, format.getWeightedSize("file.txt",
unweightedSize));
+ }
+
+ @Test
+ public void test_getWeightedSize_withGzCompression()
+ {
+ final LinesInputFormat format = new LinesInputFormat();
+ final long unweightedSize = 100L;
+ Assert.assertEquals(
+ unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR,
+ format.getWeightedSize("file.txt.gz", unweightedSize)
+ );
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java
new file mode 100644
index 00000000000..faf7d632c0a
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class LinesReaderTest extends InitializedNullHandlingTest
+{
+ @Test
+ public void testSimpleLineParsing() throws IOException
+ {
+ final String input = "line1\nline2\nline3";
+ final InputEntityReader reader = createReader(input,
Collections.emptyList());
+
+ final List<InputRow> inputRows = readAllRows(reader);
+
+ Assert.assertEquals(3, inputRows.size());
+ Assert.assertEquals("line1", inputRows.get(0).getRaw("line"));
+ Assert.assertEquals("line2", inputRows.get(1).getRaw("line"));
+ Assert.assertEquals("line3", inputRows.get(2).getRaw("line"));
+ }
+
+ @Test
+ public void testEmptyLines() throws IOException
+ {
+ final String input = "line1\n\n line3\n";
+ final InputEntityReader reader = createReader(input,
Collections.emptyList());
+
+ final List<InputRow> inputRows = readAllRows(reader);
+
+ Assert.assertEquals(3, inputRows.size());
+ Assert.assertEquals("line1", inputRows.get(0).getRaw("line"));
+ Assert.assertEquals("", inputRows.get(1).getRaw("line"));
+ Assert.assertEquals(" line3", inputRows.get(2).getRaw("line"));
+ }
+
+ @Test
+ public void testSingleLine() throws IOException
+ {
+ final String input = "single line without newline";
+ final InputEntityReader reader = createReader(input,
Collections.emptyList());
+
+ final List<InputRow> inputRows = readAllRows(reader);
+
+ Assert.assertEquals(1, inputRows.size());
+ Assert.assertEquals("single line without newline",
inputRows.get(0).getRaw("line"));
+ }
+
+ @Test
+ public void testToMap()
+ {
+ final String input = "test line";
+ final LinesReader reader = (LinesReader) createReader(input,
Collections.emptyList());
+
+ final List<Map<String, Object>> maps = reader.toMap("test line");
+
+ Assert.assertEquals(1, maps.size());
+ Assert.assertEquals(ImmutableMap.of("line", "test line"), maps.get(0));
+ }
+
+ private InputEntityReader createReader(
+ String input,
+ List<String> columns
+ )
+ {
+ final InputRowSchema inputRowSchema = new InputRowSchema(
+ new TimestampSpec("__time", "auto",
DateTimes.of("2000-01-01T00:00:00.000Z")),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(columns)),
+ ColumnsFilter.all()
+ );
+
+ final ByteEntity entity = new ByteEntity(StringUtils.toUtf8(input));
+
+ return new LinesReader(inputRowSchema, entity);
+ }
+
+ private List<InputRow> readAllRows(InputEntityReader reader) throws
IOException
+ {
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ return Lists.newArrayList(iterator);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]