[ 
https://issues.apache.org/jira/browse/DRILL-8179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17649870#comment-17649870
 ] 

ASF GitHub Bot commented on DRILL-8179:
---------------------------------------

cgivre commented on code in PR #2725:
URL: https://github.com/apache/drill/pull/2725#discussion_r1053498604


##########
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import com.github.lolo.ltsv.LtsvParser;
+import com.github.lolo.ltsv.LtsvParser.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+
+public class LTSVBatchReader implements ManagedReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LTSVBatchReader.class);
+  private final LTSVFormatPluginConfig config;
+  private final FileDescrip file;
+  private final CustomErrorContext errorContext;
+  private final LtsvParser ltsvParser;
+  private final RowSetLoader rowWriter;
+  private final FileSchemaNegotiator negotiator;
+  private InputStream fsStream;
+  private Iterator<Map<String, String>> rowIterator;
+
+
+  public LTSVBatchReader(LTSVFormatPluginConfig config, FileSchemaNegotiator 
negotiator) {
+    this.config = config;
+    this.negotiator = negotiator;
+    file = negotiator.file();
+    errorContext = negotiator.parentErrorContext();
+    ltsvParser = buildParser();
+
+    openFile();
+
+    // If there is a provided schema, import it
+    if (negotiator.providedSchema() != null) {
+      TupleMetadata schema = negotiator.providedSchema();
+      negotiator.tableSchema(schema, false);
+    }
+    ResultSetLoader loader = negotiator.build();
+    rowWriter = loader.writer();
+
+  }
+
+  private void openFile() {
+    try {
+      fsStream = 
file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+    } catch (IOException e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Unable to open LTSV File %s", file.split().getPath() + " " 
+ e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+    }
+    rowIterator = ltsvParser.parse(fsStream);
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processNextRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private LtsvParser buildParser() {
+    Builder builder = LtsvParser.builder();
+    builder.trimKeys();
+    builder.trimValues();
+    builder.skipNullValues();
+
+    if (config.getParseMode().contentEquals("strict")) {
+      builder.strict();
+    } else {
+      builder.lenient();
+    }
+
+    if (StringUtils.isNotEmpty(config.getEscapeCharacter())) {
+      builder.withEscapeChar(config.getEscapeCharacter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getKvDelimiter())) {
+      builder.withKvDelimiter(config.getKvDelimiter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getEntryDelimiter())) {
+      builder.withEntryDelimiter(config.getEntryDelimiter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getLineEnding())) {
+      builder.withLineEnding(config.getLineEnding().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getQuoteChar())) {
+      builder.withQuoteChar(config.getQuoteChar().charAt(0));
+    }
+
+    return builder.build();
+  }
+
+  private boolean processNextRow() {
+    if (!rowIterator.hasNext()) {
+      return false;
+    }
+    // Start the row
+    String key;
+    String value;
+    int columnIndex;
+    ScalarWriter columnWriter;
+    Map<String, String> row = rowIterator.next();
+
+    // Skip empty lines
+    if (row.isEmpty()) {
+      return true;
+    }
+    rowWriter.start();
+    for (Map.Entry<String,String> field: row.entrySet()) {
+      key = field.getKey();
+      value = field.getValue();
+      columnIndex = getColumnIndex(key);
+      columnWriter = getColumnWriter(key);
+
+
+      if (negotiator.providedSchema() != null) {
+        // Check the type. LTSV will only read other data types if a schema is 
provided.
+        ColumnMetadata columnMetadata = 
rowWriter.tupleSchema().metadata(columnIndex);
+        MinorType dataType = columnMetadata.type();
+        LocalTime localTime;
+        LocalDate localDate;
+
+        switch (dataType) {
+          case BIT:
+            columnWriter.setBoolean(Boolean.parseBoolean(value));
+            break;
+          case INT:
+          case SMALLINT:
+          case TINYINT:
+            columnWriter.setInt(Integer.parseInt(value));
+            break;
+          case BIGINT:
+            columnWriter.setLong(Long.parseLong(value));
+            break;
+          case FLOAT8:
+          case FLOAT4:
+            columnWriter.setDouble(Double.parseDouble(value));
+            break;
+          case TIME:
+            columnMetadata = rowWriter.tupleSchema().metadata(key);
+            String dateFormat = columnMetadata.property("drill.format");
+
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              localTime = LocalTime.parse(value);
+            } else {
+              localTime = LocalTime.parse(value, 
DateTimeFormatter.ofPattern(dateFormat));
+            }
+            columnWriter.setTime(localTime);
+            break;
+          case DATE:
+            dateFormat = columnMetadata.property("drill.format");
+
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              localDate = LocalDate.parse(value);
+            } else {
+              localDate = LocalDate.parse(value, 
DateTimeFormatter.ofPattern(dateFormat));
+            }
+            columnWriter.setDate(localDate);
+            break;
+          case TIMESTAMP:
+            dateFormat = columnMetadata.property("drill.format");
+            Instant timestamp;
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              timestamp = Instant.parse(value);
+            } else {
+              try {
+                SimpleDateFormat simpleDateFormat = new 
SimpleDateFormat(dateFormat);
+                Date parsedDate = simpleDateFormat.parse(value);
+                timestamp = Instant.ofEpochMilli(parsedDate.getTime());
+              } catch (ParseException e) {
+                throw UserException.parseError(e)
+                    .message("Cannot parse " + value + " as a timestamp. You 
can specify a format string in the provided schema to correct this.")
+                    .addContext(errorContext)
+                    .build(logger);
+              }
+            }
+            columnWriter.setTimestamp(timestamp);
+            break;
+          default:
+            columnWriter.setString(value);
+        }
+      } else {
+        columnWriter.setString(value);
+      }
+    }
+    // Finish the row
+    rowWriter.save();
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(fsStream);

Review Comment:
   Good idea.  Done.



##########
contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java:
##########
@@ -37,34 +42,77 @@ public static void setup() throws Exception {
 
   @Test
   public void testWildcard() throws Exception {
-    testBuilder()
-      .sqlQuery("SELECT * FROM cp.`simple.ltsv`")
-      .unOrdered()
-      .baselineColumns("host", "forwardedfor", "req", "status", "size", 
"referer", "ua", "reqtime", "apptime", "vhost")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/xxx HTTP/1.1", "200", 
"4968", "-", "Java/1.8.0_131", "2.532", "2.532", "api.example.com")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/yyy HTTP/1.1", "200", 
"412", "-", "Java/1.8.0_201", "3.580", "3.580", "api.example.com")
-      .go();
+    String sql = "SELECT * FROM cp.`simple.ltsv`";

Review Comment:
   Done





> Convert LTSV Format Plugin to EVF2
> ----------------------------------
>
>                 Key: DRILL-8179
>                 URL: https://issues.apache.org/jira/browse/DRILL-8179
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.20.1
>            Reporter: Jingchuan Hu
>            Assignee: Charles Givre
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Get authorized by Charles, continue the conversion from LTSV to EVF2 directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to