[ 
https://issues.apache.org/jira/browse/DRILL-7533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256175#comment-17256175
 ] 

ASF GitHub Bot commented on DRILL-7533:
---------------------------------------

cgivre commented on a change in pull request #2130:
URL: https://github.com/apache/drill/pull/2130#discussion_r549714803



##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
##########
@@ -0,0 +1,1031 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.pcap.PcapFormatUtils;
+import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.joda.time.Instant;
+
+import fr.bmartel.pcapdecoder.structure.options.inter.IOptionsStatisticsHeader;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IDescriptionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import fr.bmartel.pcapdecoder.structure.types.inter.INameResolutionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.ISectionHeaderBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IStatisticsBlock;
+
+public abstract class PcapColumn {
+
+  private static final Map<String, PcapColumn> columns = new LinkedHashMap<>();
+  private static final Map<String, PcapColumn> summary_columns = new 
LinkedHashMap<>();
+  public static final String DUMMY_NAME = "dummy";
+  public static final String PATH_NAME = "path";
+
+  static {
+    // Basic
+    columns.put("timestamp", new PcapTimestamp());
+    columns.put("packet_length", new PcapPacketLength());
+    columns.put("type", new PcapType());
+    columns.put("src_ip", new PcapSrcIp());
+    columns.put("dst_ip", new PcapDstIp());
+    columns.put("src_port", new PcapSrcPort());
+    columns.put("dst_port", new PcapDstPort());
+    columns.put("src_mac_address", new PcapSrcMac());
+    columns.put("dst_mac_address", new PcapDstMac());
+    columns.put("tcp_session", new PcapTcpSession());
+    columns.put("tcp_ack", new PcapTcpAck());
+    columns.put("tcp_flags", new PcapTcpFlags());
+    columns.put("tcp_flags_ns", new PcapTcpFlagsNs());
+    columns.put("tcp_flags_cwr", new PcapTcpFlagsCwr());
+    columns.put("tcp_flags_ece", new PcapTcpFlagsEce());
+    columns.put("tcp_flags_ece_ecn_capable", new PcapTcpFlagsEceEcnCapable());
+    columns.put("tcp_flags_ece_congestion_experienced", new 
PcapTcpFlagsEceCongestionExperienced());
+    columns.put("tcp_flags_urg", new PcapTcpFlagsUrg());
+    columns.put("tcp_flags_ack", new PcapTcpFlagsAck());
+    columns.put("tcp_flags_psh", new PcapTcpFlagsPsh());
+    columns.put("tcp_flags_rst", new PcapTcpFlagsRst());
+    columns.put("tcp_flags_syn", new PcapTcpFlagsSyn());
+    columns.put("tcp_flags_fin", new PcapTcpFlagsFin());
+    columns.put("tcp_parsed_flags", new PcapTcpParsedFlags());
+    columns.put("packet_data", new PcapPacketData());

Review comment:
       In the regular PCAP reader, we added a boolean column called 
`is_corrupt` or something like that.  Basically, instead of crashing on corrupt 
data, Drill catches is, marks that flag as true and keeps reading.  Could we 
add that column here as well?

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
##########
@@ -0,0 +1,1031 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.pcap.PcapFormatUtils;
+import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.joda.time.Instant;
+
+import fr.bmartel.pcapdecoder.structure.options.inter.IOptionsStatisticsHeader;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IDescriptionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import fr.bmartel.pcapdecoder.structure.types.inter.INameResolutionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.ISectionHeaderBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IStatisticsBlock;
+
+public abstract class PcapColumn {
+
+  private static final Map<String, PcapColumn> columns = new LinkedHashMap<>();
+  private static final Map<String, PcapColumn> summary_columns = new 
LinkedHashMap<>();
+  public static final String DUMMY_NAME = "dummy";
+  public static final String PATH_NAME = "path";
+
+  static {
+    // Basic
+    columns.put("timestamp", new PcapTimestamp());
+    columns.put("packet_length", new PcapPacketLength());
+    columns.put("type", new PcapType());
+    columns.put("src_ip", new PcapSrcIp());
+    columns.put("dst_ip", new PcapDstIp());
+    columns.put("src_port", new PcapSrcPort());
+    columns.put("dst_port", new PcapDstPort());
+    columns.put("src_mac_address", new PcapSrcMac());
+    columns.put("dst_mac_address", new PcapDstMac());
+    columns.put("tcp_session", new PcapTcpSession());
+    columns.put("tcp_ack", new PcapTcpAck());
+    columns.put("tcp_flags", new PcapTcpFlags());
+    columns.put("tcp_flags_ns", new PcapTcpFlagsNs());
+    columns.put("tcp_flags_cwr", new PcapTcpFlagsCwr());
+    columns.put("tcp_flags_ece", new PcapTcpFlagsEce());
+    columns.put("tcp_flags_ece_ecn_capable", new PcapTcpFlagsEceEcnCapable());
+    columns.put("tcp_flags_ece_congestion_experienced", new 
PcapTcpFlagsEceCongestionExperienced());
+    columns.put("tcp_flags_urg", new PcapTcpFlagsUrg());
+    columns.put("tcp_flags_ack", new PcapTcpFlagsAck());
+    columns.put("tcp_flags_psh", new PcapTcpFlagsPsh());
+    columns.put("tcp_flags_rst", new PcapTcpFlagsRst());
+    columns.put("tcp_flags_syn", new PcapTcpFlagsSyn());
+    columns.put("tcp_flags_fin", new PcapTcpFlagsFin());
+    columns.put("tcp_parsed_flags", new PcapTcpParsedFlags());
+    columns.put("packet_data", new PcapPacketData());
+
+    // Extensions
+    summary_columns.put("path", new PcapStatPath());
+    // Section Header Block
+    summary_columns.put("shb_hardware", new PcapHardware());
+    summary_columns.put("shb_os", new PcapOS());
+    summary_columns.put("shb_userappl", new PcapUserAppl());
+    // Interface Description Block
+    summary_columns.put("if_name", new PcapIfName());
+    summary_columns.put("if_description", new PcapIfDescription());
+    summary_columns.put("if_ipv4addr", new PcapIfIPv4addr());
+    summary_columns.put("if_ipv6addr", new PcapIfIPv6addr());
+    summary_columns.put("if_macaddr", new PcapIfMACaddr());
+    summary_columns.put("if_euiaddr", new PcapIfEUIaddr());
+    summary_columns.put("if_speed", new PcapIfSpeed());
+    summary_columns.put("if_tsresol", new PcapIfTsresol());
+    summary_columns.put("if_tzone", new PcapIfTzone());
+    summary_columns.put("if_os", new PcapIfOS());
+    summary_columns.put("if_fcslen", new PcapIfFcslen());
+    summary_columns.put("if_tsoffset", new PcapIfTsOffset());
+    // Name Resolution Block
+    summary_columns.put("ns_dnsname", new PcapDnsName());
+    summary_columns.put("ns_dnsip4addr", new PcapDnsIP4addr());
+    summary_columns.put("ns_dnsip6addr", new PcapDnsIP6addr());
+    // Interface Statistics Block
+    summary_columns.put("isb_starttime", new PcapIsbStarttime());
+    summary_columns.put("isb_endtime", new PcapIsbEndtime());
+    summary_columns.put("isb_ifrecv", new PcapIsbIfrecv());
+    summary_columns.put("isb_ifdrop", new PcapIsbIfdrop());
+    summary_columns.put("isb_filteraccept", new PcapIsbFilterAccept());
+    summary_columns.put("isb_osdrop", new PcapIsbOSdrop());
+    summary_columns.put("isb_usrdeliv", new PcapIsbUsrdeliv());

Review comment:
       I'm getting an IDE warning here about possibly causing class deadlock.  
Could you please take a look?

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
##########
@@ -0,0 +1,1031 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.pcap.PcapFormatUtils;
+import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.joda.time.Instant;
+
+import fr.bmartel.pcapdecoder.structure.options.inter.IOptionsStatisticsHeader;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IDescriptionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import fr.bmartel.pcapdecoder.structure.types.inter.INameResolutionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.ISectionHeaderBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IStatisticsBlock;
+
+public abstract class PcapColumn {
+
+  private static final Map<String, PcapColumn> columns = new LinkedHashMap<>();
+  private static final Map<String, PcapColumn> summary_columns = new 
LinkedHashMap<>();
+  public static final String DUMMY_NAME = "dummy";
+  public static final String PATH_NAME = "path";
+
+  static {
+    // Basic
+    columns.put("timestamp", new PcapTimestamp());
+    columns.put("packet_length", new PcapPacketLength());
+    columns.put("type", new PcapType());
+    columns.put("src_ip", new PcapSrcIp());
+    columns.put("dst_ip", new PcapDstIp());
+    columns.put("src_port", new PcapSrcPort());
+    columns.put("dst_port", new PcapDstPort());
+    columns.put("src_mac_address", new PcapSrcMac());
+    columns.put("dst_mac_address", new PcapDstMac());
+    columns.put("tcp_session", new PcapTcpSession());
+    columns.put("tcp_ack", new PcapTcpAck());
+    columns.put("tcp_flags", new PcapTcpFlags());
+    columns.put("tcp_flags_ns", new PcapTcpFlagsNs());
+    columns.put("tcp_flags_cwr", new PcapTcpFlagsCwr());
+    columns.put("tcp_flags_ece", new PcapTcpFlagsEce());
+    columns.put("tcp_flags_ece_ecn_capable", new PcapTcpFlagsEceEcnCapable());
+    columns.put("tcp_flags_ece_congestion_experienced", new 
PcapTcpFlagsEceCongestionExperienced());
+    columns.put("tcp_flags_urg", new PcapTcpFlagsUrg());
+    columns.put("tcp_flags_ack", new PcapTcpFlagsAck());
+    columns.put("tcp_flags_psh", new PcapTcpFlagsPsh());
+    columns.put("tcp_flags_rst", new PcapTcpFlagsRst());
+    columns.put("tcp_flags_syn", new PcapTcpFlagsSyn());
+    columns.put("tcp_flags_fin", new PcapTcpFlagsFin());
+    columns.put("tcp_parsed_flags", new PcapTcpParsedFlags());
+    columns.put("packet_data", new PcapPacketData());
+
+    // Extensions
+    summary_columns.put("path", new PcapStatPath());
+    // Section Header Block
+    summary_columns.put("shb_hardware", new PcapHardware());
+    summary_columns.put("shb_os", new PcapOS());
+    summary_columns.put("shb_userappl", new PcapUserAppl());
+    // Interface Description Block
+    summary_columns.put("if_name", new PcapIfName());
+    summary_columns.put("if_description", new PcapIfDescription());
+    summary_columns.put("if_ipv4addr", new PcapIfIPv4addr());
+    summary_columns.put("if_ipv6addr", new PcapIfIPv6addr());
+    summary_columns.put("if_macaddr", new PcapIfMACaddr());
+    summary_columns.put("if_euiaddr", new PcapIfEUIaddr());
+    summary_columns.put("if_speed", new PcapIfSpeed());
+    summary_columns.put("if_tsresol", new PcapIfTsresol());
+    summary_columns.put("if_tzone", new PcapIfTzone());
+    summary_columns.put("if_os", new PcapIfOS());
+    summary_columns.put("if_fcslen", new PcapIfFcslen());
+    summary_columns.put("if_tsoffset", new PcapIfTsOffset());
+    // Name Resolution Block
+    summary_columns.put("ns_dnsname", new PcapDnsName());
+    summary_columns.put("ns_dnsip4addr", new PcapDnsIP4addr());
+    summary_columns.put("ns_dnsip6addr", new PcapDnsIP6addr());
+    // Interface Statistics Block
+    summary_columns.put("isb_starttime", new PcapIsbStarttime());
+    summary_columns.put("isb_endtime", new PcapIsbEndtime());
+    summary_columns.put("isb_ifrecv", new PcapIsbIfrecv());
+    summary_columns.put("isb_ifdrop", new PcapIsbIfdrop());
+    summary_columns.put("isb_filteraccept", new PcapIsbFilterAccept());
+    summary_columns.put("isb_osdrop", new PcapIsbOSdrop());
+    summary_columns.put("isb_usrdeliv", new PcapIsbUsrdeliv());
+  }
+
+  abstract MajorType getType();
+
+  abstract void process(IPcapngType block, ScalarWriter writer);
+
+  public static Map<String, PcapColumn> getColumns() {
+    return Collections.unmodifiableMap(columns);
+  }
+
+  public static Map<String, PcapColumn> getSummaryColumns() {
+    return Collections.unmodifiableMap(summary_columns);
+  }
+
+  static class PcapDummy extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) { }
+  }
+
+  static class PcapDummyArray extends PcapColumn {

Review comment:
       This class doesn't appear to be used.  Can you please remove if it is 
unnecessary?

##########
File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
##########
@@ -17,84 +17,178 @@
  */
 package org.apache.drill.exec.store.pcapng;
 
-import org.apache.drill.PlanTestBase;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.UserRemoteException;
-import org.junit.Assert;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.joda.time.Instant;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import java.nio.file.Paths;
+@Category(RowSetTests.class)
+public class TestPcapngRecordReader extends ClusterTest {
 
-public class TestPcapngRecordReader extends PlanTestBase {
   @BeforeClass
-  public static void setupTestFiles() {
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
     dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
   }
 
   @Test
   public void testStarQuery() throws Exception {
-    Assert.assertEquals(123, testSql("select * from 
dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select * from 
dfs.`store/pcapng/example.pcapng`"));
+    String sql = "select * from dfs.`store/pcapng/sniff.pcapng`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(123, sets.rowCount());
+    sets.clear();
   }
 
   @Test
-  public void testProjectingByName() throws Exception {
-    Assert.assertEquals(123, testSql("select `timestamp`, packet_data, type 
from dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select src_ip, dst_ip, `timestamp` from 
dfs.`store/pcapng/example.pcapng`"));
+  public void testExplicitQuery() throws Exception {
+    String sql = "select type, packet_length, `timestamp` from 
dfs.`store/pcapng/sniff.pcapng` where type = 'ARP'";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("type", MinorType.VARCHAR)
+        .add("packet_length", MinorType.INT)
+        .add("timestamp", MinorType.TIMESTAMP)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("ARP", 90, Instant.ofEpochMilli(1518010669927L))
+        .addRow("ARP", 90, Instant.ofEpochMilli(1518010671874L))
+        .build();
+
+    assertEquals(2, sets.rowCount());
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "select * from dfs.`store/pcapng/sniff.pcapng` where type = 
'UDP' limit 10 offset 65";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(6, sets.rowCount());
+    sets.clear();
   }
 
   @Test
-  public void testDiffCaseQuery() throws Exception {
-    Assert.assertEquals(123, testSql("select `timestamp`, paCket_dAta, TyPe 
from dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select src_ip, dst_ip, `Timestamp` from 
dfs.`store/pcapng/example.pcapng`"));
+  public void testSerDe() throws Exception {
+    String sql = "select count(*) from dfs.`store/pcapng/example.pcapng`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+
+    assertEquals("Counts should match", 1, cnt);
   }
 
   @Test
-  public void testProjectingMissColls() throws Exception {
-    Assert.assertEquals(123, testSql("select `timestamp`, `name`, `color` from 
dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select src_ip, `time` from 
dfs.`store/pcapng/example.pcapng`"));
+  public void testCaseInsensitiveQuery() throws Exception {
+    String sql = "select `timestamp`, paCket_dAta, TyPe from 
dfs.`store/pcapng/sniff.pcapng`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(123, sets.rowCount());
+    sets.clear();
   }
 
+  @Test

Review comment:
       Could you please add a unit test for compressed files?
   Here's an example:
   
   
https://github.com/apache/drill/blob/8f892b3c9b04e5e0ff1973681ff862da857d22ef/contrib/format-spss/src/test/java/org/apache/drill/exec/store/spss/TestSpssReader.java#L141-L162
   
   
   

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import fr.bmartel.pcapdecoder.PcapDecoder;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+
+public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(PcapngBatchReader.class);
+
+  private final PcapngFormatConfig config;
+  private final EasySubScan scan;
+  private final int maxRecords;
+  private CustomErrorContext errorContext;
+  private List<SchemaPath> columns;
+  private List<ColumnDefn> projectedColumns;
+  private Iterator<IPcapngType> pcapIterator;
+  private RowSetLoader loader;
+  private InputStream in;
+  private Path path;
+
+  public PcapngBatchReader(final PcapngFormatConfig config, final EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+    this.maxRecords = scan.getMaxRecords();
+    this.columns = scan.getColumns();
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    try {
+      // init InputStream for pcap file
+      errorContext = negotiator.parentErrorContext();
+      DrillFileSystem dfs = negotiator.fileSystem();
+      path = dfs.makeQualified(negotiator.split().getPath());
+      in = dfs.openPossiblyCompressedStream(path);
+      // decode the pcap file
+      PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
+      decoder.decode();
+      pcapIterator = decoder.getSectionList().iterator();
+      logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), columns);
+    } catch (IOException e) {
+      throw UserException
+             .dataReadError(e)
+             .message("Failure in initial pcapng inputstream. " + 
e.getMessage())
+             .addContext(errorContext)
+             .build(logger);
+    } catch (Exception e) {
+      throw UserException
+             .dataReadError(e)
+             .message("Failed to decode the pcapng file. " + e.getMessage())
+             .addContext(errorContext)
+             .build(logger);
+    }
+    // define the schema
+    negotiator.tableSchema(defineMetadata(), true);
+    ResultSetLoader resultSetLoader = negotiator.build();
+    loader = resultSetLoader.writer();
+    // bind the writer for columns
+    bindColumns(loader);
+    return true;
+  }
+
+  /**
+   * The default of the `stat` parameter is false,
+   * which means that the packet data is parsed and returned,
+   * but if true, will return the statistics data about the each pcapng file 
only
+   * (consist of the information about collect devices and the summary of the 
packet data above).
+   *
+   * In addition, a pcapng file contains a single Section Header Block (SHB),
+   * a single Interface Description Block (IDB) and a few Enhanced Packet 
Blocks (EPB).
+   * <pre>
+   * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   * | SHB | IDB | EPB | EPB |    ...    | EPB |
+   * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   * </pre>
+   * 
https://pcapng.github.io/pcapng/draft-tuexen-opsawg-pcapng.html#name-physical-file-layout
+   */
+  @Override
+  public boolean next() {
+    while (!loader.isFull()) {
+      while (pcapIterator.hasNext()) {

Review comment:
       I think this loop could be refactored a bit as it's a bit confusing.  
This function is intended to process 1 row. 
   
   The format I've generally used is like this:
   ```java
     @Override
     public boolean next() {
       while (!rowWriter.isFull()) {
         if (!processNextRow()) {
           return false;
         }
       }
       return true;
     }
   ```
   
   Then the `processNextRow()` function checks to see if the limit has been 
reached, loading the data into the vectors etc.   The important thing is that 
this function needs to return `true` when there is more data but the loader is 
full (so new batches are started) and it needs to return `false` when there is 
no more data. 
   
   I'm getting an IDE warning as well with the outer loop.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import fr.bmartel.pcapdecoder.PcapDecoder;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+
+public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(PcapngBatchReader.class);
+
+  private final PcapngFormatConfig config;
+  private final EasySubScan scan;
+  private final int maxRecords;
+  private CustomErrorContext errorContext;
+  private List<SchemaPath> columns;
+  private List<ColumnDefn> projectedColumns;
+  private Iterator<IPcapngType> pcapIterator;
+  private RowSetLoader loader;
+  private InputStream in;
+  private Path path;
+
+  public PcapngBatchReader(final PcapngFormatConfig config, final EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+    this.maxRecords = scan.getMaxRecords();
+    this.columns = scan.getColumns();
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    try {
+      // init InputStream for pcap file
+      errorContext = negotiator.parentErrorContext();
+      DrillFileSystem dfs = negotiator.fileSystem();
+      path = dfs.makeQualified(negotiator.split().getPath());
+      in = dfs.openPossiblyCompressedStream(path);
+      // decode the pcap file
+      PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
+      decoder.decode();
+      pcapIterator = decoder.getSectionList().iterator();
+      logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), columns);
+    } catch (IOException e) {
+      throw UserException
+             .dataReadError(e)
+             .message("Failure in initial pcapng inputstream. " + 
e.getMessage())
+             .addContext(errorContext)
+             .build(logger);
+    } catch (Exception e) {
+      throw UserException
+             .dataReadError(e)
+             .message("Failed to decode the pcapng file. " + e.getMessage())
+             .addContext(errorContext)
+             .build(logger);
+    }
+    // define the schema
+    negotiator.tableSchema(defineMetadata(), true);
+    ResultSetLoader resultSetLoader = negotiator.build();
+    loader = resultSetLoader.writer();
+    // bind the writer for columns
+    bindColumns(loader);
+    return true;
+  }
+
+  /**
+   * The default of the `stat` parameter is false,
+   * which means that the packet data is parsed and returned,
+   * but if true, will return the statistics data about the each pcapng file 
only
+   * (consist of the information about collect devices and the summary of the 
packet data above).
+   *
+   * In addition, a pcapng file contains a single Section Header Block (SHB),
+   * a single Interface Description Block (IDB) and a few Enhanced Packet 
Blocks (EPB).
+   * <pre>
+   * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   * | SHB | IDB | EPB | EPB |    ...    | EPB |
+   * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   * </pre>
+   * 
https://pcapng.github.io/pcapng/draft-tuexen-opsawg-pcapng.html#name-physical-file-layout
+   */
+  @Override
+  public boolean next() {
+    while (!loader.isFull()) {
+      while (pcapIterator.hasNext()) {
+        IPcapngType block = pcapIterator.next();
+        if (config.getStat()) {
+          if (block instanceof IEnhancedPacketBLock) {
+            continue;
+          }
+        } else {
+          if (!(block instanceof IEnhancedPacketBLock)) {
+            continue;
+          }
+        }
+        loader.start();
+        for (ColumnDefn columnDefn : projectedColumns) {
+          // pcapng file name
+          if (columnDefn.getName().equals(PcapColumn.PATH_NAME)) {
+            columnDefn.load(path.getName());
+          } else {
+            columnDefn.load(block);
+          }
+        }
+        loader.save();
+        if (loader.limitReached(maxRecords)) {
+          return false;
+        }
+      }
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(in);
+  }
+
+  private boolean isSkipQuery() {
+    return columns.isEmpty();
+  }
+
+  private boolean isStarQuery() {
+    return Utilities.isStarQuery(columns);
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    processProjected(columns);
+    for (ColumnDefn columnDefn : projectedColumns) {
+      columnDefn.define(builder);
+    }
+    return builder.buildSchema();
+  }
+
+  /**
+   * <b> Define the schema based on projected </b><br/>
+   * 1. SkipQuery: no field specified, such as count(*) <br/>
+   * 2. StarQuery: select * <br/>
+   * 3. ProjectPushdownQuery: select a,b,c <br/>
+   */
+  private List<ColumnDefn> processProjected(List<SchemaPath> columns) {

Review comment:
       Should this function be `void`?  The return value is not used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Convert Pcapng to EVF
> ---------------------
>
>                 Key: DRILL-7533
>                 URL: https://issues.apache.org/jira/browse/DRILL-7533
>             Project: Apache Drill
>          Issue Type: Sub-task
>            Reporter: Arina Ielchiieva
>            Assignee: luocong
>            Priority: Major
>             Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to