[
https://issues.apache.org/jira/browse/DRILL-8179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17649852#comment-17649852
]
ASF GitHub Bot commented on DRILL-8179:
---------------------------------------
jnturton commented on code in PR #2725:
URL: https://github.com/apache/drill/pull/2725#discussion_r1053465233
##########
contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java:
##########
@@ -37,34 +42,77 @@ public static void setup() throws Exception {
@Test
public void testWildcard() throws Exception {
- testBuilder()
- .sqlQuery("SELECT * FROM cp.`simple.ltsv`")
- .unOrdered()
- .baselineColumns("host", "forwardedfor", "req", "status", "size",
"referer", "ua", "reqtime", "apptime", "vhost")
- .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/xxx HTTP/1.1", "200",
"4968", "-", "Java/1.8.0_131", "2.532", "2.532", "api.example.com")
- .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/yyy HTTP/1.1", "200",
"412", "-", "Java/1.8.0_201", "3.580", "3.580", "api.example.com")
- .go();
+ String sql = "SELECT * FROM cp.`simple.ltsv`";
Review Comment:
Let's rename this class TestLTSVQueries or similar now that LTSVRecordReader
is gone?
##########
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import com.github.lolo.ltsv.LtsvParser;
+import com.github.lolo.ltsv.LtsvParser.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+
+public class LTSVBatchReader implements ManagedReader {
+
+ private static final Logger logger =
LoggerFactory.getLogger(LTSVBatchReader.class);
+ private final LTSVFormatPluginConfig config;
+ private final FileDescrip file;
+ private final CustomErrorContext errorContext;
+ private final LtsvParser ltsvParser;
+ private final RowSetLoader rowWriter;
+ private final FileSchemaNegotiator negotiator;
+ private InputStream fsStream;
+ private Iterator<Map<String, String>> rowIterator;
+
+
+ public LTSVBatchReader(LTSVFormatPluginConfig config, FileSchemaNegotiator
negotiator) {
+ this.config = config;
+ this.negotiator = negotiator;
+ file = negotiator.file();
+ errorContext = negotiator.parentErrorContext();
+ ltsvParser = buildParser();
+
+ openFile();
+
+ // If there is a provided schema, import it
+ if (negotiator.providedSchema() != null) {
+ TupleMetadata schema = negotiator.providedSchema();
+ negotiator.tableSchema(schema, false);
+ }
+ ResultSetLoader loader = negotiator.build();
+ rowWriter = loader.writer();
+
+ }
+
+ private void openFile() {
+ try {
+ fsStream =
file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Unable to open LTSV File %s", file.split().getPath() + " "
+ e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ rowIterator = ltsvParser.parse(fsStream);
+ }
+
+ @Override
+ public boolean next() {
+ while (!rowWriter.isFull()) {
+ if (!processNextRow()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private LtsvParser buildParser() {
+ Builder builder = LtsvParser.builder();
+ builder.trimKeys();
+ builder.trimValues();
+ builder.skipNullValues();
+
+ if (config.getParseMode().contentEquals("strict")) {
+ builder.strict();
+ } else {
+ builder.lenient();
+ }
+
+ if (StringUtils.isNotEmpty(config.getEscapeCharacter())) {
+ builder.withEscapeChar(config.getEscapeCharacter().charAt(0));
+ }
+
+ if (StringUtils.isNotEmpty(config.getKvDelimiter())) {
+ builder.withKvDelimiter(config.getKvDelimiter().charAt(0));
+ }
+
+ if (StringUtils.isNotEmpty(config.getEntryDelimiter())) {
+ builder.withEntryDelimiter(config.getEntryDelimiter().charAt(0));
+ }
+
+ if (StringUtils.isNotEmpty(config.getLineEnding())) {
+ builder.withLineEnding(config.getLineEnding().charAt(0));
+ }
+
+ if (StringUtils.isNotEmpty(config.getQuoteChar())) {
+ builder.withQuoteChar(config.getQuoteChar().charAt(0));
+ }
+
+ return builder.build();
+ }
+
+ private boolean processNextRow() {
+ if (!rowIterator.hasNext()) {
+ return false;
+ }
+ // Start the row
+ String key;
+ String value;
+ int columnIndex;
+ ScalarWriter columnWriter;
+ Map<String, String> row = rowIterator.next();
+
+ // Skip empty lines
+ if (row.isEmpty()) {
+ return true;
+ }
+ rowWriter.start();
+ for (Map.Entry<String,String> field: row.entrySet()) {
+ key = field.getKey();
+ value = field.getValue();
+ columnIndex = getColumnIndex(key);
+ columnWriter = getColumnWriter(key);
+
+
+ if (negotiator.providedSchema() != null) {
+ // Check the type. LTSV will only read other data types if a schema is
provided.
+ ColumnMetadata columnMetadata =
rowWriter.tupleSchema().metadata(columnIndex);
+ MinorType dataType = columnMetadata.type();
+ LocalTime localTime;
+ LocalDate localDate;
+
+ switch (dataType) {
+ case BIT:
+ columnWriter.setBoolean(Boolean.parseBoolean(value));
+ break;
+ case INT:
+ case SMALLINT:
+ case TINYINT:
+ columnWriter.setInt(Integer.parseInt(value));
+ break;
+ case BIGINT:
+ columnWriter.setLong(Long.parseLong(value));
+ break;
+ case FLOAT8:
+ case FLOAT4:
+ columnWriter.setDouble(Double.parseDouble(value));
+ break;
+ case TIME:
+ columnMetadata = rowWriter.tupleSchema().metadata(key);
+ String dateFormat = columnMetadata.property("drill.format");
+
+ if (Strings.isNullOrEmpty(dateFormat)) {
+ localTime = LocalTime.parse(value);
+ } else {
+ localTime = LocalTime.parse(value,
DateTimeFormatter.ofPattern(dateFormat));
+ }
+ columnWriter.setTime(localTime);
+ break;
+ case DATE:
+ dateFormat = columnMetadata.property("drill.format");
+
+ if (Strings.isNullOrEmpty(dateFormat)) {
+ localDate = LocalDate.parse(value);
+ } else {
+ localDate = LocalDate.parse(value,
DateTimeFormatter.ofPattern(dateFormat));
+ }
+ columnWriter.setDate(localDate);
+ break;
+ case TIMESTAMP:
+ dateFormat = columnMetadata.property("drill.format");
+ Instant timestamp;
+ if (Strings.isNullOrEmpty(dateFormat)) {
+ timestamp = Instant.parse(value);
+ } else {
+ try {
+ SimpleDateFormat simpleDateFormat = new
SimpleDateFormat(dateFormat);
+ Date parsedDate = simpleDateFormat.parse(value);
+ timestamp = Instant.ofEpochMilli(parsedDate.getTime());
+ } catch (ParseException e) {
+ throw UserException.parseError(e)
+ .message("Cannot parse " + value + " as a timestamp. You
can specify a format string in the provided schema to correct this.")
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+ columnWriter.setTimestamp(timestamp);
+ break;
+ default:
+ columnWriter.setString(value);
+ }
+ } else {
+ columnWriter.setString(value);
+ }
+ }
+ // Finish the row
+ rowWriter.save();
+ return true;
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(fsStream);
Review Comment:
Can I propose that we add a log output line at the debug level because we're
freeing an operating resource here? We don't always log that, but I think we
should because it can help someone pouring through logs while chasing a
resource consumption problem.
##########
contrib/format-ltsv/pom.xml:
##########
@@ -36,6 +36,11 @@
<artifactId>drill-java-exec</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.lonely-lockley</groupId>
+ <artifactId>ltsv-parser</artifactId>
Review Comment:
Reviewer's note: this library is under the Apache 2.0 license
> Convert LTSV Format Plugin to EVF2
> ----------------------------------
>
> Key: DRILL-8179
> URL: https://issues.apache.org/jira/browse/DRILL-8179
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.20.1
> Reporter: Jingchuan Hu
> Assignee: Charles Givre
> Priority: Major
> Fix For: 2.0.0
>
>
> Get authorized by Charles, continue the conversion from LTSV to EVF2 directly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)