This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 2dfd0da DRILL-6179: Added pcapng-format support
2dfd0da is described below
commit 2dfd0dab41864f22c7ed924e17c6a8cf9b2f54ad
Author: Vlad Storona <[email protected]>
AuthorDate: Thu Jun 21 16:41:17 2018 +0300
DRILL-6179: Added pcapng-format support
---
.../native/client/src/protobuf/UserBitShared.pb.cc | 13 +-
.../native/client/src/protobuf/UserBitShared.pb.h | 5 +-
exec/java-exec/pom.xml | 5 +
.../drill/exec/store/pcap/decoder/Packet.java | 16 +-
.../exec/store/pcapng/PcapngFormatConfig.java | 52 +++
.../exec/store/pcapng/PcapngFormatPlugin.java | 76 ++++
.../exec/store/pcapng/PcapngRecordReader.java | 214 ++++++++++
.../exec/store/pcapng/decoder/PacketDecoder.java | 61 +++
.../drill/exec/store/pcapng/package-info.java | 23 ++
.../drill/exec/store/pcapng/schema/Column.java | 28 ++
.../exec/store/pcapng/schema/DummyArrayImpl.java | 34 ++
.../drill/exec/store/pcapng/schema/DummyImpl.java | 34 ++
.../drill/exec/store/pcapng/schema/Schema.java | 441 +++++++++++++++++++++
.../drill/exec/store/pcapng/schema/Util.java | 59 +++
.../main/resources/bootstrap-storage-plugins.json | 3 +
.../store/dfs/TestFormatPluginOptionExtractor.java | 1 +
.../drill/exec/store/pcapng/TestPcapngHeaders.java | 212 ++++++++++
.../exec/store/pcapng/TestPcapngRecordReader.java | 100 +++++
.../src/test/resources/store/pcapng/example.pcapng | Bin 0 -> 512 bytes
.../src/test/resources/store/pcapng/sniff.pcapng | Bin 0 -> 33464 bytes
.../org/apache/drill/exec/proto/UserBitShared.java | 21 +-
.../drill/exec/proto/beans/CoreOperatorType.java | 4 +-
protocol/src/main/protobuf/UserBitShared.proto | 1 +
23 files changed, 1380 insertions(+), 23 deletions(-)
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index 7398048..8f98e06 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -750,7 +750,7 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
"TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020"
"\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022"
"\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005"
- "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe"
+ "\022\032\n\026CANCELLATION_REQUESTED\020\006*\343\010\n\020CoreOpe"
"ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS"
"T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE"
"\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS"
@@ -778,11 +778,11 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
"ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI"
"TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S"
"UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART"
- "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN"
- "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES"
-
"S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B."
- "\n\033org.apache.drill.exec.protoB\rUserBitSh"
- "aredH\001", 5406);
+ "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207*g\n\nSa"
+ "slStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START"
+ "\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS"
+ "\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill."
+ "exec.protoB\rUserBitSharedH\001", 5427);
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
"UserBitShared.proto", &protobuf_RegisterTypes);
UserCredentials::default_instance_ = new UserCredentials();
@@ -958,6 +958,7 @@ bool CoreOperatorType_IsValid(int value) {
case 52:
case 53:
case 54:
+ case 55:
return true;
default:
return false;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h
b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 4599abb..a07cbfa 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -258,11 +258,12 @@ enum CoreOperatorType {
HTPPD_LOG_SUB_SCAN = 51,
IMAGE_SUB_SCAN = 52,
SEQUENCE_SUB_SCAN = 53,
- PARTITION_LIMIT = 54
+ PARTITION_LIMIT = 54,
+ PCAPNG_SUB_SCAN = 55
};
bool CoreOperatorType_IsValid(int value);
const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-const CoreOperatorType CoreOperatorType_MAX = PARTITION_LIMIT;
+const CoreOperatorType CoreOperatorType_MAX = PCAPNG_SUB_SCAN;
const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor();
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index f175c65..f406895 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -534,6 +534,11 @@
<artifactId>metadata-extractor</artifactId>
<version>2.11.0</version>
</dependency>
+ <dependency>
+ <groupId>fr.bmartel</groupId>
+ <artifactId>pcapngdecoder</artifactId>
+ <version>1.2</version>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index 9cc98de..a0a07a9 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -42,18 +42,18 @@ public class Packet {
private long timestamp;
private int originalLength;
- private byte[] raw;
+ protected byte[] raw;
// index into the raw data where the current ethernet packet starts
private int etherOffset;
// index into the raw data where the current IP packet starts. Should be
just after etherOffset
- private int ipOffset;
+ protected int ipOffset;
private int packetLength;
- private int etherProtocol;
- private int protocol;
+ protected int etherProtocol;
+ protected int protocol;
- private boolean isRoutingV6;
+ protected boolean isRoutingV6;
@SuppressWarnings("WeakerAccess")
public boolean readPcap(final InputStream in, final boolean byteOrder, final
int maxLength) throws IOException {
@@ -379,7 +379,7 @@ public class Packet {
return (getByte(raw, ipOffset) & 0xf) * 4;
}
- private int ipVersion() {
+ protected int ipVersion() {
return getByte(raw, ipOffset) >>> 4;
}
@@ -409,12 +409,12 @@ public class Packet {
// everything is decoded lazily
}
- private int processIpV4Packet() {
+ protected int processIpV4Packet() {
validateIpV4Packet();
return getByte(raw, ipOffset + 9);
}
- private int processIpV6Packet() {
+ protected int processIpV6Packet() {
Preconditions.checkState(ipVersion() == 6, "Should have seen IP version 6,
got %d", ipVersion());
int headerLength = 40;
int nextHeader = raw[ipOffset + 6] & 0xff;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
new file mode 100644
index 0000000..7ff875a
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName("pcapng")
+public class PcapngFormatConfig implements FormatPluginConfig {
+
+ public List<String> extensions = Collections.singletonList("pcapng");
+
+ public List<String> getExtensions() {
+ return extensions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PcapngFormatConfig that = (PcapngFormatConfig) o;
+ return Objects.equals(extensions, that.extensions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(extensions);
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
new file mode 100644
index 0000000..832c0ec
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> {
+
+ public static final String DEFAULT_NAME = "pcapng";
+
+ public PcapngFormatPlugin(String name, DrillbitContext context,
Configuration fsConf,
+ StoragePluginConfig storagePluginConfig) {
+ this(name, context, fsConf, storagePluginConfig, new PcapngFormatConfig());
+ }
+
+ public PcapngFormatPlugin(String name, DrillbitContext context,
Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig
formatPluginConfig) {
+ super(name, context, fsConf, config, formatPluginConfig, true,
+ false, true, false,
+ formatPluginConfig.getExtensions(), DEFAULT_NAME);
+ }
+
+ @Override
+ public boolean supportsPushDown() {
+ return true;
+ }
+
+ @Override
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem
dfs,
+ FileWork fileWork, List<SchemaPath>
columns,
+ String userName) {
+ return new PcapngRecordReader(fileWork.getPath(), dfs, columns);
+ }
+
+ @Override
+ public RecordWriter getRecordWriter(FragmentContext context, EasyWriter
writer) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public int getReaderOperatorType() {
+ return UserBitShared.CoreOperatorType.PCAPNG_SUB_SCAN_VALUE;
+ }
+
+ @Override
+ public int getWriterOperatorType() {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
new file mode 100644
index 0000000..b1c5f24
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import fr.bmartel.pcapdecoder.PcapDecoder;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.pcapng.schema.Column;
+import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl;
+import org.apache.drill.exec.store.pcapng.schema.DummyImpl;
+import org.apache.drill.exec.store.pcapng.schema.Schema;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+public class PcapngRecordReader extends AbstractRecordReader {
+ private static final Logger logger =
LoggerFactory.getLogger(PcapngRecordReader.class);
+
+ // batch size should not exceed max allowed record count
+ private static final int BATCH_SIZE = 40_000;
+
+ private final Path pathToFile;
+ private OutputMutator output;
+ private List<ProjectedColumnInfo> projectedCols;
+ private FileSystem fs;
+ private FSDataInputStream in;
+ private List<SchemaPath> columns;
+
+ private Iterator<IPcapngType> it;
+
+ public PcapngRecordReader(final String pathToFile,
+ final FileSystem fileSystem,
+ final List<SchemaPath> columns) {
+ this.fs = fileSystem;
+ this.pathToFile = fs.makeQualified(new Path(pathToFile));
+ this.columns = columns;
+ setColumns(columns);
+ }
+
+ @Override
+ public void setup(final OperatorContext context, final OutputMutator output)
throws ExecutionSetupException {
+ try {
+
+ this.output = output;
+ this.in = fs.open(pathToFile);
+ PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
+ decoder.decode();
+ this.it = decoder.getSectionList().iterator();
+ setupProjection();
+ } catch (IOException io) {
+ throw UserException.dataReadError(io)
+ .addContext("File name:", pathToFile.toUri().getPath())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public int next() {
+ if (isSkipQuery()) {
+ return iterateOverBlocks((block, counter) -> {
+ });
+ } else {
+ return iterateOverBlocks((block, counter) ->
putToTable((IEnhancedPacketBLock) block, counter));
+ }
+ }
+
+ private void putToTable(IEnhancedPacketBLock bLock, Integer counter) {
+ for (ProjectedColumnInfo pci : projectedCols) {
+ pci.getColumn().process(bLock, pci.getVv(), counter);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ }
+
+ private void setupProjection() {
+ if (isSkipQuery()) {
+ projectedCols = projectNone();
+ } else if (isStarQuery()) {
+ projectedCols = projectAllCols(Schema.getColumnsNames());
+ } else {
+ projectedCols = projectCols(columns);
+ }
+ }
+
+ private List<ProjectedColumnInfo> projectNone() {
+ List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
+ pciBuilder.add(makeColumn("dummy", new DummyImpl()));
+ return Collections.unmodifiableList(pciBuilder);
+ }
+
+ private List<ProjectedColumnInfo> projectAllCols(final Set<String> columns) {
+ List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
+ for (String colName : columns) {
+ pciBuilder.add(makeColumn(colName, Schema.getColumns().get(colName)));
+ }
+ return Collections.unmodifiableList(pciBuilder);
+ }
+
+ private List<ProjectedColumnInfo> projectCols(final List<SchemaPath>
columns) {
+ List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
+ for (SchemaPath schemaPath : columns) {
+ String projectedName = schemaPath.rootName();
+ if (schemaPath.isArray()) {
+ pciBuilder.add(makeColumn(projectedName, new DummyArrayImpl()));
+ } else if (Schema.getColumns().containsKey(projectedName.toLowerCase()))
{
+ pciBuilder.add(makeColumn(projectedName,
+ Schema.getColumns().get(projectedName.toLowerCase())));
+ } else {
+ pciBuilder.add(makeColumn(projectedName, new DummyImpl()));
+ }
+ }
+ return Collections.unmodifiableList(pciBuilder);
+ }
+
+ private ProjectedColumnInfo makeColumn(final String colName, final Column
column) {
+ MaterializedField field = MaterializedField.create(colName,
column.getMinorType());
+ ValueVector vector = getValueVector(field, output);
+ return new ProjectedColumnInfo(vector, column, colName);
+ }
+
+ private ValueVector getValueVector(final MaterializedField field, final
OutputMutator output) {
+ try {
+ TypeProtos.MajorType majorType = field.getType();
+ final Class<? extends ValueVector> clazz =
TypeHelper.getValueVectorClass(
+ majorType.getMinorType(), majorType.getMode());
+
+ return output.addField(field, clazz);
+ } catch (SchemaChangeException sce) {
+ throw UserException.internalError(sce)
+ .addContext("The addition of this field is incompatible with this
OutputMutator's capabilities")
+ .build(logger);
+ }
+ }
+
+ private Integer iterateOverBlocks(BiConsumer<IPcapngType, Integer> consumer)
{
+ int counter = 0;
+ while (it.hasNext() && counter < BATCH_SIZE) {
+ IPcapngType block = it.next();
+ if (block instanceof IEnhancedPacketBLock) {
+ consumer.accept(block, counter);
+ counter++;
+ }
+ }
+ return counter;
+ }
+
+ private static class ProjectedColumnInfo {
+
+ private ValueVector vv;
+ private Column colDef;
+ private String columnName;
+
+ ProjectedColumnInfo(ValueVector vv, Column colDef, String columnName) {
+ this.vv = vv;
+ this.colDef = colDef;
+ this.columnName = columnName;
+ }
+
+ public ValueVector getVv() {
+ return vv;
+ }
+
+ Column getColumn() {
+ return colDef;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java
new file mode 100644
index 0000000..ea5d831
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.decoder;
+
+import org.apache.drill.exec.store.pcap.decoder.Packet;
+import org.apache.drill.exec.store.pcap.decoder.PacketConstants;
+
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort;
+
+public class PacketDecoder extends Packet {
+
+ @SuppressWarnings("WeakerAccess")
+ public boolean readPcapng(final byte[] raw) {
+ this.raw = raw;
+ return decodeEtherPacket();
+ }
+
+ private boolean decodeEtherPacket() {
+ etherProtocol = getShort(raw, PacketConstants.PACKET_PROTOCOL_OFFSET);
+ ipOffset = PacketConstants.IP_OFFSET;
+ if (isIpV4Packet()) {
+ protocol = processIpV4Packet();
+ return true;
+ } else if (isIpV6Packet()) {
+ int tmp = processIpV6Packet();
+ if (tmp != -1) {
+ protocol = tmp;
+ }
+ return true;
+ } else if (isPPPoV6Packet()) {
+ protocol = getByte(raw, 48);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected int processIpV6Packet() {
+ try {
+ return super.processIpV6Packet();
+ } catch (IllegalStateException ise) {
+ return -1;
+ }
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
new file mode 100644
index 0000000..dafeaa3
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * For comments on realization of this format plugin look at :
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/DRILL-6179"> Jira</a>
+ */
+package org.apache.drill.exec.store.pcapng;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
new file mode 100644
index 0000000..109b7dd
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.vector.ValueVector;
+
+public interface Column {
+ TypeProtos.MajorType getMinorType();
+
+ void process(IEnhancedPacketBLock block, ValueVector vv, int count);
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
new file mode 100644
index 0000000..2023d19
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class DummyArrayImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.repeated(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
new file mode 100644
index 0000000..a8c26a0
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class DummyImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
new file mode 100644
index 0000000..a9738bd
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
+import static
org.apache.drill.exec.store.pcapng.schema.Util.setNullableLongColumnValue;
+
+public class Schema {
+
+ private final static Map<String, Column> columns = new HashMap<>();
+
+ static {
+ columns.put("timestamp", new TimestampImpl());
+ columns.put("packet_length", new PacketLenImpl());
+ columns.put("type", new TypeImpl());
+ columns.put("src_ip", new SrcIpImpl());
+ columns.put("dst_ip", new DstIpImpl());
+ columns.put("src_port", new SrcPortImpl());
+ columns.put("dst_port", new DstPortImpl());
+ columns.put("src_mac_address", new SrcMacImpl());
+ columns.put("dst_mac_address", new DstMacImpl());
+ columns.put("tcp_session", new TcpSessionImpl());
+ columns.put("tcp_ack", new TcpAckImpl());
+ columns.put("tcp_flags", new TcpFlags());
+ columns.put("tcp_flags_ns", new TcpFlagsNsImpl());
+ columns.put("tcp_flags_cwr", new TcpFlagsCwrImpl());
+ columns.put("tcp_flags_ece", new TcpFlagsEceImpl());
+ columns.put("tcp_flags_ece_ecn_capable", new TcpFlagsEceEcnCapableImpl());
+ columns.put("tcp_flags_ece_congestion_experienced", new
TcpFlagsEceCongestionExperiencedImpl());
+ columns.put("tcp_flags_urg", new TcpFlagsUrgIml());
+ columns.put("tcp_flags_ack", new TcpFlagsAckImpl());
+ columns.put("tcp_flags_psh", new TcpFlagsPshImpl());
+ columns.put("tcp_flags_rst", new TcpFlagsRstImpl());
+ columns.put("tcp_flags_syn", new TcpFlagsSynImpl());
+ columns.put("tcp_flags_fin", new TcpFlagsFinImpl());
+ columns.put("tcp_parsed_flags", new TcpParsedFlags());
+ columns.put("packet_data", new PacketDataImpl());
+ }
+
+ public static Map<String, Column> getColumns() {
+ return columns;
+ }
+
+ public static Set<String> getColumnsNames() {
+ return columns.keySet();
+ }
+
+ static class TimestampImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.required(TypeProtos.MinorType.TIMESTAMP);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ Util.setTimestampColumnValue(block.getTimeStamp(), vv, count);
+ }
+ }
+
+ static class PacketLenImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.required(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ Util.setIntegerColumnValue(block.getPacketLength(), vv, count);
+ }
+ }
+
+ static class TypeImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getPacketType(), vv, count);
+ }
+ }
+ }
+
+ static class SrcIpImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getSrc_ip().getHostAddress(),
vv, count);
+ }
+ }
+ }
+
+ static class DstIpImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getDst_ip().getHostAddress(),
vv, count);
+ }
+ }
+ }
+
+ static class SrcPortImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableIntegerColumnValue(packet.getSrc_port(), vv, count);
+ }
+ }
+ }
+
+ static class DstPortImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableIntegerColumnValue(packet.getDst_port(), vv, count);
+ }
+ }
+ }
+
+ static class SrcMacImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getEthernetSource(), vv,
count);
+ }
+ }
+ }
+
+ static class DstMacImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getEthernetDestination(), vv,
count);
+ }
+ }
+ }
+
+ static class TcpSessionImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.BIGINT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ setNullableLongColumnValue(packet.getSessionHash(), vv, count);
+ }
+ }
+ }
+
+ static class TcpAckImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableIntegerColumnValue(packet.getAckNumber(), vv, count);
+ }
+ }
+ }
+
+ static class TcpFlags implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableIntegerColumnValue(packet.getFlags(), vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsNsImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x100) != 0,
vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsCwrImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x80) != 0,
vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsEceImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x40) != 0,
vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsEceEcnCapableImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x42,
vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsEceCongestionExperiencedImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x40,
vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsUrgIml implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x20) != 0,
vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsAckImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x10) != 0,
vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsPshImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x8) != 0, vv,
count);
+ }
+ }
+ }
+
+ static class TcpFlagsRstImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x4) != 0, vv,
count);
+ }
+ }
+ }
+
+ static class TcpFlagsSynImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x2) != 0, vv,
count);
+ }
+ }
+ }
+
+ static class TcpFlagsFinImpl implements Column {
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x1) != 0, vv,
count);
+ }
+ }
+ }
+
+ static class TcpParsedFlags implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getParsedFlags(), vv, count);
+ }
+ }
+ }
+
+ static class PacketDataImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+
Util.setNullableStringColumnValue(parseBytesToASCII(block.getPacketData()), vv,
count);
+ }
+ }
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
new file mode 100644
index 0000000..06e8e6a
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.TimeStampVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class Util {
+ static void setNullableIntegerColumnValue(final int data, final ValueVector
vv, final int count) {
+ ((NullableIntVector.Mutator) vv.getMutator())
+ .setSafe(count, data);
+ }
+
+ static void setIntegerColumnValue(final int data, final ValueVector vv,
final int count) {
+ ((IntVector.Mutator) vv.getMutator())
+ .setSafe(count, data);
+ }
+
+ static void setTimestampColumnValue(final long data, final ValueVector vv,
final int count) {
+ ((TimeStampVector.Mutator) vv.getMutator())
+ .setSafe(count, data / 1000);
+ }
+
+ static void setNullableLongColumnValue(final long data, final ValueVector
vv, final int count) {
+ ((NullableBigIntVector.Mutator) vv.getMutator())
+ .setSafe(count, data);
+ }
+
+ static void setNullableStringColumnValue(final String data, final
ValueVector vv, final int count) {
+ ((NullableVarCharVector.Mutator) vv.getMutator())
+ .setSafe(count, data.getBytes(UTF_8), 0, data.length());
+ }
+
+ static void setNullableBooleanColumnValue(final boolean data, final
ValueVector vv, final int count) {
+ ((NullableIntVector.Mutator) vv.getMutator())
+ .setSafe(count, data ? 1 : 0);
+ }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 42cddd8..46f1620 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -46,6 +46,9 @@
"pcap" : {
type: "pcap"
},
+ "pcapng" : {
+ type: "pcapng"
+ },
"avro" : {
type: "avro"
},
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index f51fe4c..e53c394 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -59,6 +59,7 @@ public class TestFormatPluginOptionExtractor {
case "json":
case "sequencefile":
case "pcap":
+ case "pcapng":
case "avro":
assertEquals(d.typeName, "(type: String)", d.presentParams());
break;
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
new file mode 100644
index 0000000..5dcffa9
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+
+public class TestPcapngHeaders extends ClusterTest {
+ @BeforeClass
+ public static void setupTestFiles() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+ dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
+ }
+
+ @Test
+ public void testValidHeadersForStarQuery() throws IOException {
+ String query = "select * from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("tcp_flags_ece_ecn_capable",
Types.optional(TypeProtos.MinorType.INT)));
+
expectedSchema.add(MaterializedField.create("tcp_flags_ece_congestion_experienced",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_psh",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("type",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_cwr",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("dst_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("src_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_fin",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_ece",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_ack",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("src_mac_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_syn",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_rst",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
+ expectedSchema.add(MaterializedField.create("tcp_session",
Types.optional(TypeProtos.MinorType.BIGINT)));
+ expectedSchema.add(MaterializedField.create("packet_data",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_parsed_flags",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_ns",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("src_port",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("packet_length",
Types.required(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_urg",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_ack",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("dst_port",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("dst_mac_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testValidHeadersForProjection() throws IOException {
+ String query = "select sRc_ip, dst_IP, dst_mAc_address, src_Port,
tcp_session, `Timestamp` from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("sRc_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_IP",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_mAc_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("src_Port",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_session",
Types.optional(TypeProtos.MinorType.BIGINT)));
+ expectedSchema.add(MaterializedField.create("Timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testValidHeadersForMissColumns() throws IOException {
+ String query = "select `timestamp`, `name`, `color` from
dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
+ expectedSchema.add(MaterializedField.create("name",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("color",
Types.optional(TypeProtos.MinorType.INT)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testMixColumns() throws IOException {
+ String query = "select src_ip, dst_ip, dst_mac_address, src_port,
tcp_session, `timestamp` from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("sRc_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_IP",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_mAc_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("src_Port",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_session",
Types.optional(TypeProtos.MinorType.BIGINT)));
+ expectedSchema.add(MaterializedField.create("Timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+
+ String queryWithDiffOrder = "select `timestamp`, src_ip, dst_ip, src_port,
tcp_session, dst_mac_address from dfs.`store/pcapng/sniff.pcapng`";
+ actual = client.queryBuilder().sql(queryWithDiffOrder).rowSet();
+
+ expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
+ expectedSchema.add(MaterializedField.create("src_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("src_port",
Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_session",
Types.optional(TypeProtos.MinorType.BIGINT)));
+ expectedSchema.add(MaterializedField.create("dst_mac_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
+
+ expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testValidHeaderForArrayColumns() throws IOException {
+ // query with non-existent field
+ String query = "select arr[3] as arr from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("arr",
Types.optional(TypeProtos.MinorType.INT)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+
+ // query with an existent field which doesn't support arrays
+ query = "select type[45] as arr from dfs.`store/pcapng/sniff.pcapng`";
+
+ expectedSchema = new TupleSchema();
+ actual = client.queryBuilder().sql(query).rowSet();
+
+ expectedSchema.add(MaterializedField.create("arr",
Types.optional(TypeProtos.MinorType.INT)));
+
+ expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testValidHeaderForNestedColumns() throws IOException {
+ // query with non-existent field
+ String query = "select top['nested'] as nested from
dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("nested",
Types.optional(TypeProtos.MinorType.INT)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+
+ // query with an existent field which doesn't support nesting
+ query = "select type['nested'] as nested from
dfs.`store/pcapng/sniff.pcapng`";
+
+ expectedSchema = new TupleSchema();
+ actual = client.queryBuilder().sql(query).rowSet();
+
+ expectedSchema.add(MaterializedField.create("nested",
Types.optional(TypeProtos.MinorType.INT)));
+
+ expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
new file mode 100644
index 0000000..98d7b67
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+
+public class TestPcapngRecordReader extends PlanTestBase {
+ @BeforeClass
+ public static void setupTestFiles() {
+ dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
+ }
+
+ @Test
+ public void testStarQuery() throws Exception {
+ Assert.assertEquals(123, testSql("select * from
dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select * from
dfs.`store/pcapng/example.pcapng`"));
+ }
+
+ @Test
+ public void testProjectingByName() throws Exception {
+ Assert.assertEquals(123, testSql("select `timestamp`, packet_data, type
from dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select src_ip, dst_ip, `timestamp` from
dfs.`store/pcapng/example.pcapng`"));
+ }
+
+ @Test
+ public void testDiffCaseQuery() throws Exception {
+ Assert.assertEquals(123, testSql("select `timestamp`, paCket_dAta, TyPe
from dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select src_ip, dst_ip, `Timestamp` from
dfs.`store/pcapng/example.pcapng`"));
+ }
+
+ @Test
+ public void testProjectingMissColls() throws Exception {
+ Assert.assertEquals(123, testSql("select `timestamp`, `name`, `color` from
dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select src_ip, `time` from
dfs.`store/pcapng/example.pcapng`"));
+ }
+
+
+ @Test
+ public void testCountQuery() throws Exception {
+ testBuilder()
+ .sqlQuery("select count(*) as ct from dfs.`store/pcapng/sniff.pcapng`")
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues(123L)
+ .build()
+ .run();
+
+ testBuilder()
+ .sqlQuery("select count(*) as ct from
dfs.`store/pcapng/example.pcapng`")
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues(1L)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testGroupBy() throws Exception {
+ Assert.assertEquals(47, testSql("select src_ip, count(1),
sum(packet_length) from dfs.`store/pcapng/sniff.pcapng` group by src_ip"));
+ }
+
+ @Test
+ public void testDistinctQuery() throws Exception {
+ Assert.assertEquals(119, testSql("select distinct `timestamp`, src_ip from
dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select distinct packet_data from
dfs.`store/pcapng/example.pcapng`"));
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testBasicQueryWithIncorrectFileName() throws Exception {
+ testSql("select * from dfs.`store/pcapng/snaff.pcapng`");
+ }
+
+ @Test
+ public void testPhysicalPlanExecutionBasedOnQuery() throws Exception {
+ String query = "EXPLAIN PLAN for select * from
dfs.`store/pcapng/sniff.pcapng`";
+ String plan = getPlanInString(query, JSON_FORMAT);
+ Assert.assertEquals(123, testPhysical(plan));
+ }
+}
diff --git a/exec/java-exec/src/test/resources/store/pcapng/example.pcapng
b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng
new file mode 100644
index 0000000..002cb8d
Binary files /dev/null and
b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng differ
diff --git a/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng
b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng
new file mode 100644
index 0000000..cd542bd
Binary files /dev/null and
b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng differ
diff --git
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 77bf211..c9e6dc2 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -585,6 +585,10 @@ public final class UserBitShared {
* <code>PARTITION_LIMIT = 54;</code>
*/
PARTITION_LIMIT(54, 54),
+ /**
+ * <code>PCAPNG_SUB_SCAN = 55;</code>
+ */
+ PCAPNG_SUB_SCAN(55, 55),
;
/**
@@ -807,6 +811,10 @@ public final class UserBitShared {
* <code>PARTITION_LIMIT = 54;</code>
*/
public static final int PARTITION_LIMIT_VALUE = 54;
+ /**
+ * <code>PCAPNG_SUB_SCAN = 55;</code>
+ */
+ public static final int PCAPNG_SUB_SCAN_VALUE = 55;
public final int getNumber() { return value; }
@@ -868,6 +876,7 @@ public final class UserBitShared {
case 52: return IMAGE_SUB_SCAN;
case 53: return SEQUENCE_SUB_SCAN;
case 54: return PARTITION_LIMIT;
+ case 55: return PCAPNG_SUB_SCAN;
default: return null;
}
}
@@ -24404,7 +24413,7 @@ public final class UserBitShared {
"TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" +
"\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022"
+
"\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005"
+
- "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe" +
+ "\022\032\n\026CANCELLATION_REQUESTED\020\006*\343\010\n\020CoreOpe" +
"ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" +
"T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE"
+
"\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS"
+
@@ -24432,11 +24441,11 @@ public final class UserBitShared {
"ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" +
"TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S",
"UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" +
- "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN" +
- "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES"
+
-
"S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B."
+
- "\n\033org.apache.drill.exec.protoB\rUserBitSh" +
- "aredH\001"
+ "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207*g\n\nSa" +
+ "slStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START" +
+
"\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS" +
+ "\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill." +
+ "exec.protoB\rUserBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner
assigner =
new
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 38ac50e..2d7d492 100644
---
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -76,7 +76,8 @@ public enum CoreOperatorType implements
com.dyuproject.protostuff.EnumLite<CoreO
HTPPD_LOG_SUB_SCAN(51),
IMAGE_SUB_SCAN(52),
SEQUENCE_SUB_SCAN(53),
- PARTITION_LIMIT(54);
+ PARTITION_LIMIT(54),
+ PCAPNG_SUB_SCAN(55);
public final int number;
@@ -149,6 +150,7 @@ public enum CoreOperatorType implements
com.dyuproject.protostuff.EnumLite<CoreO
case 52: return IMAGE_SUB_SCAN;
case 53: return SEQUENCE_SUB_SCAN;
case 54: return PARTITION_LIMIT;
+ case 55: return PCAPNG_SUB_SCAN;
default: return null;
}
}
diff --git a/protocol/src/main/protobuf/UserBitShared.proto
b/protocol/src/main/protobuf/UserBitShared.proto
index 65ebe0b..62802f6 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -343,6 +343,7 @@ enum CoreOperatorType {
IMAGE_SUB_SCAN = 52;
SEQUENCE_SUB_SCAN = 53;
PARTITION_LIMIT = 54;
+ PCAPNG_SUB_SCAN = 55;
}
/* Registry that contains list of jars, each jar contains its name and list of
function signatures.