vdiravka commented on a change in pull request #2192:
URL: https://github.com/apache/drill/pull/2192#discussion_r602925295



##########
File path: 
contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatConfig.java
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.pcapng;
+package org.apache.drill.exec.store.plugin;

Review comment:
       It is occasionally. I intended to to add additional package `plugin`, to 
separate out classes related to `FormatPlugin` and `FormatConfig` from actual 
`pcap` and `pcapng` readers.
   So the actual package is `package org.apache.drill.exec.store.pcap.plugin;`

##########
File path: 
contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatConfig.java
##########
@@ -29,18 +29,23 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName(PcapngFormatConfig.NAME)
+@JsonTypeName(PcapFormatConfig.NAME)
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class PcapngFormatConfig implements FormatPluginConfig {
+public class PcapFormatConfig implements FormatPluginConfig {
+  private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcap", 
"pcapng");
 
-  public static final String NAME = "pcapng";
+  public static final String NAME = "pcap";

Review comment:
       You are right. Backward compatibility wasn't preserved.
   I'll change it by adding two additional classes `PcapngFormatConfig`, 
`PcapngFormatPlugin` and mark them as deprecated, since they are needed only 
for older plugin configs.
   
   `UnitTest` for this is really tricky. Since json deserializing is possible 
only in the process of including formats during bootstrap. So the test cases 
with the following format plugin initialization are added:
   `cluster.defineFormat("dfs", "pcapng", new 
PcapFormatConfig(ImmutableList.of("pcapng"), true, false));`

##########
File path: 
contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatPlugin.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plugin;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.pcap.PcapBatchReader;
+import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
+import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.regex.Pattern;
+
+public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
+
+  static final Logger logger = 
LoggerFactory.getLogger(ManagedScanFramework.class);
+  private static PacketDecoder.FileFormat fileFormat = 
PacketDecoder.FileFormat.UNKNOWN;
+
+  public PcapFormatPlugin(String name,
+                          DrillbitContext context,
+                          Configuration fsConf,
+                          StoragePluginConfig storageConfig,
+                          PcapFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, 
formatConfig);
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, 
PcapFormatConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+        .readable(true)
+        .writable(false)
+        .blockSplittable(false)
+        .compressible(true)
+        .extensions(pluginConfig.getExtensions())
+        .fsConf(fsConf)
+        .useEnhancedScan(true)
+        .supportsLimitPushdown(true)
+        .supportsProjectPushdown(true)
+        .defaultName(PcapFormatConfig.NAME)
+        .build();
+  }
+
+  private static class PcapReaderFactory extends FileReaderFactory {
+
+    private final PcapFormatConfig config;
+    private final EasySubScan scan;
+
+    public PcapReaderFactory(PcapFormatConfig config, EasySubScan scan) {
+      this.config = config;
+      this.scan = scan;
+    }
+
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      fileFormat = fromMagicNumber(fileFramework);
+      return createReader(scan, config);
+    }
+  }
+
+  @Override
+  public ManagedReader<? extends FileSchemaNegotiator> 
newBatchReader(EasySubScan scan, OptionManager options) {
+    return createReader(scan, formatConfig);
+  }
+
+  private static ManagedReader<? extends FileSchemaNegotiator> 
createReader(EasySubScan scan, PcapFormatConfig config) {
+    switch(fileFormat) {
+      case PCAPNG: return new PcapngBatchReader(config, scan);
+      case PCAP:
+      case UNKNOWN:
+      default: return new PcapBatchReader(config, scan.getMaxRecords());
+    }
+  }
+
+  @Override
+  protected FileScanBuilder frameworkBuilder(OptionManager options, 
EasySubScan scan) {
+    FileScanBuilder builder = new FileScanBuilder();
+    builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan));
+
+    initScanBuilder(builder, scan);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
+  }
+
+  /**
+   * Helper method to detect PCAP or PCAPNG file format based on file Magic 
Number
+   *
+   * @param fileFramework for obtaining InputStream
+   * @return PCAP/PCAPNG file format
+   */
+  private static PacketDecoder.FileFormat fromMagicNumber(FileScanFramework 
fileFramework) {
+    FileScanFramework.FileSchemaNegotiatorImpl negotiator = 
(FileScanFramework.FileSchemaNegotiatorImpl) fileFramework.newNegotiator();
+    DrillFileSystem dfs = negotiator.fileSystem();
+    Path path = dfs.makeQualified(negotiator.split().getPath());
+    try (InputStream inputStream = dfs.openPossiblyCompressedStream(path)) {
+      PacketDecoder decoder = new PacketDecoder(inputStream);
+      return decoder.getFileFormat();
+    } catch (IOException io) {
+      throw UserException
+              .dataReadError(io)
+              .addContext("File name:", path.toString())
+              .build(logger);
+    }
+  }
+
+  /**
+   * Detects PCAP or PCAPNG file format based on file extension. Not used now 
due to {@link #fromMagicNumber} usage for
+   * this purpose

Review comment:
       Agreed. Removed

##########
File path: 
contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatPlugin.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.plugin;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;

Review comment:
       It is used in almost all format plugins. So looks fine to refactor it in 
scope of "Storage plugins improvement" 
   @vvysotskyi Do we have a ticket for it?

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
##########
@@ -103,7 +103,7 @@ protected void configure() {
   }
 
   @Override
-  protected SchemaNegotiatorImpl newNegotiator() {
+  public SchemaNegotiatorImpl newNegotiator() {

Review comment:
       I agree the negotiator is an internal staff of `fileFramework`, so not 
good to make it public.
   Change it with adding `fileSystem()` method in `FileScanFramework`. For me 
it looks essential here.
   
   I think there is no need here to create something more powerful for just 
obtaining `DrillFileSystem` object.

##########
File path: 
contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/plugin/PcapFormatConfig.java
##########
@@ -29,18 +29,23 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName(PcapngFormatConfig.NAME)
+@JsonTypeName(PcapFormatConfig.NAME)
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-public class PcapngFormatConfig implements FormatPluginConfig {
+public class PcapFormatConfig implements FormatPluginConfig {
+  private static final List<String> DEFAULT_EXTNS = ImmutableList.of("pcap", 
"pcapng");
 
-  public static final String NAME = "pcapng";
+  public static final String NAME = "pcap";
   private final List<String> extensions;
   private final boolean stat;
+  private final boolean sessionizeTCPStreams;
 
   @JsonCreator
-  public PcapngFormatConfig(@JsonProperty("extensions") List<String> 
extensions, @JsonProperty("stat") boolean stat) {
-    this.extensions = extensions == null ? 
ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions);
+  public PcapFormatConfig(@JsonProperty("extensions") List<String> extensions,
+                          @JsonProperty("stat") boolean stat,
+                          @JsonProperty("sessionizeTCPStreams") Boolean 
sessionizeTCPStreams) {

Review comment:
       Agree. But in was not introduced by me. This field was present in the 
original `pcap` format config:
   ```
     @JsonCreator
     public PcapFormatConfig(
         @JsonProperty("extensions") List<String> extensions,
         @JsonProperty("sessionizeTCPStreams") Boolean sessionizeTCPStreams) {
   ```

##########
File path: 
contrib/format-pcapng/src/main/resources/bootstrap-format-plugins.json
##########
@@ -3,26 +3,29 @@
     "dfs": {
       "type": "file",
       "formats": {
-        "pcapng": {
-          "type": "pcapng",
+        "pcap": {
+          "type": "pcap",

Review comment:
       resolved

##########
File path: 
contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
##########
@@ -48,126 +49,61 @@
   private static final Logger logger = 
LoggerFactory.getLogger(PcapBatchReader.class);
 
   private FileSplit split;
-
   private PacketDecoder decoder;
-
   private InputStream fsStream;
-
   private RowSetLoader rowWriter;
-
   private int validBytes;
-
   private byte[] buffer;
-
   private int offset;
-
   private ScalarWriter typeWriter;
-
   private ScalarWriter timestampWriter;
-
   private ScalarWriter timestampMicroWriter;
-
   private ScalarWriter networkWriter;
-
   private ScalarWriter srcMacAddressWriter;
-
   private ScalarWriter dstMacAddressWriter;
-
   private ScalarWriter dstIPWriter;
-
   private ScalarWriter srcIPWriter;
-
   private ScalarWriter srcPortWriter;
-
   private ScalarWriter dstPortWriter;
-
   private ScalarWriter packetLengthWriter;
-
   private ScalarWriter tcpSessionWriter;
-
   private ScalarWriter tcpSequenceWriter;
-
   private ScalarWriter tcpAckNumberWriter;
-
   private ScalarWriter tcpFlagsWriter;
-
   private ScalarWriter tcpParsedFlagsWriter;
-
   private ScalarWriter tcpNsWriter;
-
   private ScalarWriter tcpCwrWriter;
-
   private ScalarWriter tcpEceWriter;
-
   private ScalarWriter tcpFlagsEceEcnCapableWriter;
-
   private ScalarWriter tcpFlagsCongestionWriter;
-
   private ScalarWriter tcpUrgWriter;
-
   private ScalarWriter tcpAckWriter;
-
   private ScalarWriter tcpPshWriter;
-
   private ScalarWriter tcpRstWriter;
-
   private ScalarWriter tcpSynWriter;
-
   private ScalarWriter tcpFinWriter;
-
   private ScalarWriter dataWriter;
-
   private ScalarWriter isCorruptWriter;
-
-  private final PcapReaderConfig readerConfig;
-
-
+  private final PcapFormatConfig readerConfig;
   // Writers for TCP Sessions
   private ScalarWriter sessionStartTimeWriter;
-
   private ScalarWriter sessionEndTimeWriter;
-
   private ScalarWriter sessionDurationWriter;
-
   private ScalarWriter connectionTimeWriter;
-
   private ScalarWriter packetCountWriter;
-
   private ScalarWriter originPacketCounterWriter;
-
   private ScalarWriter remotePacketCounterWriter;
-
   private ScalarWriter originDataVolumeWriter;
-
   private ScalarWriter remoteDataVolumeWriter;
-
   private ScalarWriter hostDataWriter;
-
   private ScalarWriter remoteDataWriter;
-
   private final int maxRecords;
-
   private Map<Long, TcpSession> sessionQueue;
 
 
-  public static class PcapReaderConfig {
-
-    protected final PcapFormatPlugin plugin;
-
-    public boolean sessionizeTCPStreams;
-
-    private final PcapFormatConfig config;
-
-    public PcapReaderConfig(PcapFormatPlugin plugin) {
-      this.plugin = plugin;
-      this.config = plugin.getConfig();
-      this.sessionizeTCPStreams = config.getSessionizeTCPStreams();
-    }
-  }
-
-  public PcapBatchReader(PcapReaderConfig readerConfig, int maxRecords) {
+  public PcapBatchReader(PcapFormatConfig readerConfig, int maxRecords) {
     this.readerConfig = readerConfig;
-    if (readerConfig.sessionizeTCPStreams) {
+    if (readerConfig.getSessionizeTCPStreams()) {

Review comment:
       It makes sense. It means it is needed to make these fields to be 
`public`. But such fields of other plugins are also private. Does that mean 
they should be changed to public eventually too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to