This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new f3a4387b25 DRILL-8310: Convert Syslog Format to EVF V2 (#2653)
f3a4387b25 is described below

commit f3a4387b25c613bf0f661c2a51fbf3e628bfec50
Author: Charles S. Givre <[email protected]>
AuthorDate: Tue Sep 20 08:08:02 2022 -0400

    DRILL-8310: Convert Syslog Format to EVF V2 (#2653)
---
 .../drill/exec/store/syslog/SyslogBatchReader.java | 30 ++++++-------------
 .../exec/store/syslog/SyslogFormatPlugin.java      | 34 +++++++---------------
 2 files changed, 19 insertions(+), 45 deletions(-)

diff --git 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
index 457375315e..f5284ac6e1 100644
--- 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
+++ 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
@@ -23,8 +23,9 @@ import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -34,7 +35,6 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
-import org.apache.hadoop.mapred.FileSplit;
 import org.realityforge.jsyslog.message.StructuredDataParameter;
 import org.realityforge.jsyslog.message.SyslogMessage;
 import org.slf4j.Logger;
@@ -50,13 +50,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-public class SyslogBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class SyslogBatchReader implements ManagedReader {
   private static final Logger logger = 
LoggerFactory.getLogger(SyslogBatchReader.class);
   private final String STRUCTURED_DATA_PREFIX = "structured_data_";
   private final String STRUCTURED_DATA_MAP_NAME = "structured_data";
   private final String RAW_COLUMN_NAME = "_raw";
-
-  private final int maxRecords;
   private final SyslogFormatConfig config;
   private final EasySubScan subScan;
   private final Map<String, MinorType> mappedColumns = new LinkedHashMap<>();
@@ -64,7 +62,7 @@ public class SyslogBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
   private int errorCount;
   private CustomErrorContext errorContext;
   private InputStream fsStream;
-  private FileSplit split;
+  private final FileDescrip file;
   private BufferedReader reader;
   private RowSetLoader rowWriter;
   private List<ScalarWriter> writerArray;
@@ -73,16 +71,12 @@ public class SyslogBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
   private TupleWriter structuredDataWriter;
 
 
-  public SyslogBatchReader(int maxRecords, SyslogFormatConfig config, 
EasySubScan scan) {
-    this.maxRecords = maxRecords;
+  public SyslogBatchReader(SyslogFormatConfig config, EasySubScan scan, 
FileSchemaNegotiator negotiator) {
     this.config = config;
     this.subScan = scan;
     populateMappedColumns();
-  }
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
+    file = negotiator.file();
     openFile(negotiator);
     negotiator.tableSchema(buildSchema(), false);
     errorContext = negotiator.parentErrorContext();
@@ -92,7 +86,6 @@ public class SyslogBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
     writerArray = populateRowWriters();
     rawColumnWriter = rowWriter.scalar(RAW_COLUMN_NAME);
     messageWriter = rowWriter.scalar("message");
-    return true;
   }
 
   @Override
@@ -120,11 +113,11 @@ public class SyslogBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
 
   private void openFile(FileSchemaNegotiator negotiator) {
     try {
-      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      fsStream = 
file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
     } catch (IOException e) {
       throw UserException
         .dataReadError(e)
-        .message("Unable to open Syslog File %s", split.getPath())
+        .message("Unable to open Syslog File %s", file.split().getPath())
         .addContext(e.getMessage())
         .addContext(errorContext)
         .build(logger);
@@ -179,11 +172,6 @@ public class SyslogBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
   }
 
   private boolean processNextLine() {
-    // Check to see if the limit has been reached
-    if (rowWriter.limitReached(maxRecords)) {
-      return false;
-    }
-
     String line;
     try {
       line = reader.readLine();
diff --git 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index c8af2325e7..3e52f46fbc 100644
--- 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++ 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -21,15 +21,13 @@ package org.apache.drill.exec.store.syslog;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
@@ -37,20 +35,17 @@ public class SyslogFormatPlugin extends 
EasyFormatPlugin<SyslogFormatConfig> {
   public static final String DEFAULT_NAME = "syslog";
 
   private static class SyslogReaderFactory extends FileReaderFactory {
-
-    private final int maxRecords;
     private final SyslogFormatConfig formatConfig;
     private final EasySubScan scan;
 
-    public SyslogReaderFactory(int maxRecords, SyslogFormatConfig 
formatConfig, EasySubScan scan) {
-      this.maxRecords = maxRecords;
+    public SyslogReaderFactory(SyslogFormatConfig formatConfig, EasySubScan 
scan) {
       this.formatConfig = formatConfig;
       this.scan = scan;
     }
 
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new SyslogBatchReader(maxRecords, formatConfig, scan);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+      return new SyslogBatchReader(formatConfig, scan, negotiator);
     }
   }
 
@@ -70,23 +65,14 @@ public class SyslogFormatPlugin extends 
EasyFormatPlugin<SyslogFormatConfig> {
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
         .build();
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> 
newBatchReader(EasySubScan scan, OptionSet options)  {
-    return new SyslogBatchReader(scan.getMaxRecords(), formatConfig, scan);
-  }
-
-  @Override
-  protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet 
options) {
-    FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new SyslogReaderFactory(scan.getMaxRecords(), 
formatConfig, scan));
-
-    initScanBuilder(builder, scan);
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan 
scan) {
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new SyslogReaderFactory(formatConfig, scan));
   }
 }

Reply via email to