[ 
https://issues.apache.org/jira/browse/DRILL-6179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463355#comment-16463355
 ] 

ASF GitHub Bot commented on DRILL-6179:
---------------------------------------

paul-rogers commented on a change in pull request #1126: DRILL-6179: Added 
pcapng-format support
URL: https://github.com/apache/drill/pull/1126#discussion_r185987171
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
 ##########
 @@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.pcapng.decoder.Decoder;
+import org.apache.drill.exec.store.pcapng.decoder.Packet;
+import 
org.apache.drill.exec.store.pcapng.decoder.structure.IEnhancedPacketBLock;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
+
+public class PcapngRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(PcapngRecordReader.class);
+
+  // batch size should not exceed max allowed record count
+  private static final int BATCH_SIZE = 40_000;
+
+  private static final Map<PcapngTypes, TypeProtos.MinorType> TYPES;
+
+  static {
+    TYPES = ImmutableMap.<PcapngTypes, TypeProtos.MinorType>builder()
+        .put(PcapngTypes.STRING, TypeProtos.MinorType.VARCHAR)
+        .put(PcapngTypes.INTEGER, TypeProtos.MinorType.INT)
+        .put(PcapngTypes.LONG, TypeProtos.MinorType.BIGINT)
+        .put(PcapngTypes.TIMESTAMP, TypeProtos.MinorType.TIMESTAMP)
+        .build();
+  }
+
+  private final Path pathToFile;
+  private OutputMutator output;
+  private ImmutableList<ProjectedColumnInfo> projectedCols;
+  private FileSystem fs;
+  private FSDataInputStream in;
+  private List<SchemaPath> projectedColumns;
+
+  private Iterator<IEnhancedPacketBLock> it;
+
+
+  public PcapngRecordReader(final String pathToFile,
+                            final FileSystem fileSystem,
+                            final List<SchemaPath> projectedColumns) {
+    this.fs = fileSystem;
+    this.pathToFile = fs.makeQualified(new Path(pathToFile));
+    this.projectedColumns = projectedColumns;
+  }
+
+  @Override
+  public void setup(final OperatorContext context, final OutputMutator output) 
throws ExecutionSetupException {
+    try {
+      this.output = output;
+      this.in = fs.open(pathToFile);
+      Decoder decoder = new Decoder(in);
+      this.it = decoder.getPcapngSectionList().iterator();
+      this.projectedCols = getProjectedColsIfItNull();
+      setColumns(projectedColumns);
+    } catch (IOException io) {
+      throw UserException.dataReadError(io)
+          .addContext("File name:", pathToFile.toUri().getPath())
+          .build(logger);
+    }
+  }
+
+  @Override
+  public int next() {
+    return parsePcapngFilesAndPutItToTable();
+  }
+
+  @Override
+  public void close() throws Exception {
+    in.close();
+  }
+
+  private ImmutableList<ProjectedColumnInfo> getProjectedColsIfItNull() {
+    return projectedCols != null ? projectedCols : initCols(new Schema());
+  }
+
+  private ImmutableList<ProjectedColumnInfo> initCols(final Schema schema) {
+    ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = 
ImmutableList.builder();
+
+    for (String columnName : schema.getColumns().keySet()) {
+      final PcapngTypes type = schema.getColumns().get(columnName);
+      TypeProtos.MinorType minorType = TYPES.get(type);
+      ProjectedColumnInfo pci = getProjectedColumnInfo(columnName, minorType);
+      pciBuilder.add(pci);
+    }
+    return pciBuilder.build();
+  }
+
+  private ProjectedColumnInfo getProjectedColumnInfo(final String name,
+                                                     final 
TypeProtos.MinorType minorType) {
+    TypeProtos.MajorType majorType = getMajorType(minorType);
+
+    MaterializedField field =
+        MaterializedField.create(name, majorType);
+
+    ValueVector vector =
+        getValueVector(minorType, majorType, field);
+
+    return getProjectedColumnInfo(name, vector);
+  }
+
+  private ProjectedColumnInfo getProjectedColumnInfo(final String column, 
final ValueVector vector) {
+    ProjectedColumnInfo pci = new ProjectedColumnInfo();
+    pci.vv = vector;
+    pci.columnName = column;
+    return pci;
+  }
+
+  private TypeProtos.MajorType getMajorType(final TypeProtos.MinorType 
minorType) {
+    return Types.optional(minorType);
+  }
+
+  private ValueVector getValueVector(final TypeProtos.MinorType minorType,
+                                     final TypeProtos.MajorType majorType,
+                                     final MaterializedField field) {
+    try {
+
+      final Class<? extends ValueVector> clazz = 
TypeHelper.getValueVectorClass(
+          minorType, majorType.getMode());
+      ValueVector vector = output.addField(field, clazz);
+      vector.allocateNew();
+      return vector;
+
+    } catch (SchemaChangeException sce) {
+      throw new IllegalStateException("The addition of this field is 
incompatible with this OutputMutator's capabilities");
 
 Review comment:
   To ensure a good error message, consider using `UserException.systemError()` 
with a message that says that something went wrong internally: ask for help or 
some such.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Added pcapng-format support
> ---------------------------
>
>                 Key: DRILL-6179
>                 URL: https://issues.apache.org/jira/browse/DRILL-6179
>             Project: Apache Drill
>          Issue Type: New Feature
>    Affects Versions: 1.13.0
>            Reporter: Vlad
>            Assignee: Vlad
>            Priority: Major
>              Labels: doc-impacting
>             Fix For: 1.14.0
>
>
> The _PCAP Next Generation Dump File Format_ (or pcapng for short) [1] is an 
> attempt to overcome the limitations of the currently widely used (but 
> limited) libpcap format.
> At a first level, it is desirable to query and filter by source and 
> destination IP and port, and src/dest mac addreses or by protocol. Beyond 
> that, however, it would be very useful to be able to group packets by TCP 
> session and eventually to look at packet contents.
> Initial work is available at  
> https://github.com/mapr-demos/drill/tree/pcapng_dev
> [1] https://pcapng.github.io/pcapng/
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to