[
https://issues.apache.org/jira/browse/DRILL-7533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256528#comment-17256528
]
ASF GitHub Bot commented on DRILL-7533:
---------------------------------------
cgivre commented on a change in pull request #2130:
URL: https://github.com/apache/drill/pull/2130#discussion_r550192801
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import fr.bmartel.pcapdecoder.PcapDecoder;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+
+public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PcapngBatchReader.class);
+
+ private final PcapngFormatConfig config;
+ private final EasySubScan scan;
+ private final int maxRecords;
+ private CustomErrorContext errorContext;
+ private List<SchemaPath> columns;
+ private List<ColumnDefn> projectedColumns;
+ private Iterator<IPcapngType> pcapIterator;
+ private RowSetLoader loader;
+ private InputStream in;
+ private Path path;
+
+ public PcapngBatchReader(final PcapngFormatConfig config, final EasySubScan
scan) {
+ this.config = config;
+ this.scan = scan;
+ this.maxRecords = scan.getMaxRecords();
+ this.columns = scan.getColumns();
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ try {
+ // init InputStream for pcap file
+ errorContext = negotiator.parentErrorContext();
+ DrillFileSystem dfs = negotiator.fileSystem();
+ path = dfs.makeQualified(negotiator.split().getPath());
+ in = dfs.openPossiblyCompressedStream(path);
+ // decode the pcap file
+ PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
+ decoder.decode();
+ pcapIterator = decoder.getSectionList().iterator();
+ logger.debug("The config is {}, root is {}, columns has {}", config,
scan.getSelectionRoot(), columns);
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failure in initial pcapng inputstream. " +
e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to decode the pcapng file. " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ // define the schema
+ negotiator.tableSchema(defineMetadata(), true);
+ ResultSetLoader resultSetLoader = negotiator.build();
+ loader = resultSetLoader.writer();
+ // bind the writer for columns
+ bindColumns(loader);
+ return true;
+ }
+
+ /**
+ * The default of the `stat` parameter is false,
+ * which means that the packet data is parsed and returned,
+ * but if true, will return the statistics data about the each pcapng file
only
+ * (consist of the information about collect devices and the summary of the
packet data above).
+ *
+ * In addition, a pcapng file contains a single Section Header Block (SHB),
+ * a single Interface Description Block (IDB) and a few Enhanced Packet
Blocks (EPB).
+ * <pre>
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | SHB | IDB | EPB | EPB | ... | EPB |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * </pre>
+ *
https://pcapng.github.io/pcapng/draft-tuexen-opsawg-pcapng.html#name-physical-file-layout
+ */
+ @Override
+ public boolean next() {
+ while (!loader.isFull()) {
+ if (pcapIterator.hasNext()) {
+ IPcapngType block = pcapIterator.next();
+ if (config.getStat()) {
Review comment:
@luocooong
Thanks for addressing the review comments. I have a few more minor
comments and we should be good to go. This particular logic here is a bit
difficult to follow. Could we refactor the if-statements so that it is a
little more clear?
I'm thinking (pseudocode)
```java
if (config.getStat() && isIEnhancedPacketBlock()) {
// do something
} else if (! config.getStat() && !isEnhancedPacketBlock()) {
// Do something else
}
```
My personal preference is to put the check to see if the parser has more
records at the beginning of the loop. So it would look something like this:
(pseudocode)
```java
while (!loader.isFull() ) {
// No more records
if (!parser.hasNext()) {
return false;
} else if (config.getStat() && isIEnhancedPacketBlock()) {
// do something
} else if (! config.getStat() && !isEnhancedPacketBlock()) {
// Do something else
}
}
// process the row
loader.start()
}
```
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
##########
@@ -0,0 +1,1031 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.pcap.PcapFormatUtils;
+import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.joda.time.Instant;
+
+import fr.bmartel.pcapdecoder.structure.options.inter.IOptionsStatisticsHeader;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IDescriptionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import fr.bmartel.pcapdecoder.structure.types.inter.INameResolutionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.ISectionHeaderBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IStatisticsBlock;
+
+public abstract class PcapColumn {
+
+ private static final Map<String, PcapColumn> columns = new LinkedHashMap<>();
+ private static final Map<String, PcapColumn> summary_columns = new
LinkedHashMap<>();
+ public static final String DUMMY_NAME = "dummy";
+ public static final String PATH_NAME = "path";
+
+ static {
+ // Basic
+ columns.put("timestamp", new PcapTimestamp());
+ columns.put("packet_length", new PcapPacketLength());
+ columns.put("type", new PcapType());
+ columns.put("src_ip", new PcapSrcIp());
+ columns.put("dst_ip", new PcapDstIp());
+ columns.put("src_port", new PcapSrcPort());
+ columns.put("dst_port", new PcapDstPort());
+ columns.put("src_mac_address", new PcapSrcMac());
+ columns.put("dst_mac_address", new PcapDstMac());
+ columns.put("tcp_session", new PcapTcpSession());
+ columns.put("tcp_ack", new PcapTcpAck());
+ columns.put("tcp_flags", new PcapTcpFlags());
+ columns.put("tcp_flags_ns", new PcapTcpFlagsNs());
+ columns.put("tcp_flags_cwr", new PcapTcpFlagsCwr());
+ columns.put("tcp_flags_ece", new PcapTcpFlagsEce());
+ columns.put("tcp_flags_ece_ecn_capable", new PcapTcpFlagsEceEcnCapable());
+ columns.put("tcp_flags_ece_congestion_experienced", new
PcapTcpFlagsEceCongestionExperienced());
+ columns.put("tcp_flags_urg", new PcapTcpFlagsUrg());
+ columns.put("tcp_flags_ack", new PcapTcpFlagsAck());
+ columns.put("tcp_flags_psh", new PcapTcpFlagsPsh());
+ columns.put("tcp_flags_rst", new PcapTcpFlagsRst());
+ columns.put("tcp_flags_syn", new PcapTcpFlagsSyn());
+ columns.put("tcp_flags_fin", new PcapTcpFlagsFin());
+ columns.put("tcp_parsed_flags", new PcapTcpParsedFlags());
+ columns.put("packet_data", new PcapPacketData());
Review comment:
@luocooong
I saw your response. Basically, what we did in the original reader was put
a lot of code in try/catch blocks and if it found errors, set `is_corrupt` to
`true` and move on to the next packet. Have you tested the PCAP-NG with a
corrupt packet? Does the `parse()` method throw an exception if the packet is
corrupt?
The reason I'm asking is that it is often better for the user to get a
result, rather than a huge java stacktrace, so if we can avoid that, it would
be good.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Convert Pcapng to EVF
> ---------------------
>
> Key: DRILL-7533
> URL: https://issues.apache.org/jira/browse/DRILL-7533
> Project: Apache Drill
> Issue Type: Sub-task
> Reporter: Arina Ielchiieva
> Assignee: luocong
> Priority: Major
> Fix For: 1.19.0
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)