This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 2dfd0da DRILL-6179: Added pcapng-format support 2dfd0da is described below commit 2dfd0dab41864f22c7ed924e17c6a8cf9b2f54ad Author: Vlad Storona <vstor...@cybervisiontech.com> AuthorDate: Thu Jun 21 16:41:17 2018 +0300 DRILL-6179: Added pcapng-format support --- .../native/client/src/protobuf/UserBitShared.pb.cc | 13 +- .../native/client/src/protobuf/UserBitShared.pb.h | 5 +- exec/java-exec/pom.xml | 5 + .../drill/exec/store/pcap/decoder/Packet.java | 16 +- .../exec/store/pcapng/PcapngFormatConfig.java | 52 +++ .../exec/store/pcapng/PcapngFormatPlugin.java | 76 ++++ .../exec/store/pcapng/PcapngRecordReader.java | 214 ++++++++++ .../exec/store/pcapng/decoder/PacketDecoder.java | 61 +++ .../drill/exec/store/pcapng/package-info.java | 23 ++ .../drill/exec/store/pcapng/schema/Column.java | 28 ++ .../exec/store/pcapng/schema/DummyArrayImpl.java | 34 ++ .../drill/exec/store/pcapng/schema/DummyImpl.java | 34 ++ .../drill/exec/store/pcapng/schema/Schema.java | 441 +++++++++++++++++++++ .../drill/exec/store/pcapng/schema/Util.java | 59 +++ .../main/resources/bootstrap-storage-plugins.json | 3 + .../store/dfs/TestFormatPluginOptionExtractor.java | 1 + .../drill/exec/store/pcapng/TestPcapngHeaders.java | 212 ++++++++++ .../exec/store/pcapng/TestPcapngRecordReader.java | 100 +++++ .../src/test/resources/store/pcapng/example.pcapng | Bin 0 -> 512 bytes .../src/test/resources/store/pcapng/sniff.pcapng | Bin 0 -> 33464 bytes .../org/apache/drill/exec/proto/UserBitShared.java | 21 +- .../drill/exec/proto/beans/CoreOperatorType.java | 4 +- protocol/src/main/protobuf/UserBitShared.proto | 1 + 23 files changed, 1380 insertions(+), 23 deletions(-) diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc index 7398048..8f98e06 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc @@ -750,7 +750,7 @@ void protobuf_AddDesc_UserBitShared_2eproto() { "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022" "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005" - "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe" + "\022\032\n\026CANCELLATION_REQUESTED\020\006*\343\010\n\020CoreOpe" "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS" @@ -778,11 +778,11 @@ void protobuf_AddDesc_UserBitShared_2eproto() { "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S" "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" - "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN" - "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES" - "S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B." - "\n\033org.apache.drill.exec.protoB\rUserBitSh" - "aredH\001", 5406); + "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207*g\n\nSa" + "slStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START" + "\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS" + "\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill." + "exec.protoB\rUserBitSharedH\001", 5427); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "UserBitShared.proto", &protobuf_RegisterTypes); UserCredentials::default_instance_ = new UserCredentials(); @@ -958,6 +958,7 @@ bool CoreOperatorType_IsValid(int value) { case 52: case 53: case 54: + case 55: return true; default: return false; diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h index 4599abb..a07cbfa 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.h +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h @@ -258,11 +258,12 @@ enum CoreOperatorType { HTPPD_LOG_SUB_SCAN = 51, IMAGE_SUB_SCAN = 52, SEQUENCE_SUB_SCAN = 53, - PARTITION_LIMIT = 54 + PARTITION_LIMIT = 54, + PCAPNG_SUB_SCAN = 55 }; bool CoreOperatorType_IsValid(int value); const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER; -const CoreOperatorType CoreOperatorType_MAX = PARTITION_LIMIT; +const CoreOperatorType CoreOperatorType_MAX = PCAPNG_SUB_SCAN; const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1; const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor(); diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index f175c65..f406895 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -534,6 +534,11 @@ <artifactId>metadata-extractor</artifactId> <version>2.11.0</version> </dependency> + <dependency> + <groupId>fr.bmartel</groupId> + <artifactId>pcapngdecoder</artifactId> + <version>1.2</version> + </dependency> </dependencies> <profiles> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java index 9cc98de..a0a07a9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java @@ -42,18 +42,18 @@ public class Packet { private long timestamp; private int originalLength; - private byte[] raw; + protected byte[] raw; // index into the raw data where the current ethernet packet starts private int etherOffset; // index into the raw data where the current IP packet starts. Should be just after etherOffset - private int ipOffset; + protected int ipOffset; private int packetLength; - private int etherProtocol; - private int protocol; + protected int etherProtocol; + protected int protocol; - private boolean isRoutingV6; + protected boolean isRoutingV6; @SuppressWarnings("WeakerAccess") public boolean readPcap(final InputStream in, final boolean byteOrder, final int maxLength) throws IOException { @@ -379,7 +379,7 @@ public class Packet { return (getByte(raw, ipOffset) & 0xf) * 4; } - private int ipVersion() { + protected int ipVersion() { return getByte(raw, ipOffset) >>> 4; } @@ -409,12 +409,12 @@ public class Packet { // everything is decoded lazily } - private int processIpV4Packet() { + protected int processIpV4Packet() { validateIpV4Packet(); return getByte(raw, ipOffset + 9); } - private int processIpV6Packet() { + protected int processIpV6Packet() { Preconditions.checkState(ipVersion() == 6, "Should have seen IP version 6, got %d", ipVersion()); int headerLength = 40; int nextHeader = raw[ipOffset + 6] & 0xff; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java new file mode 100644 index 0000000..7ff875a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.logical.FormatPluginConfig; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +@JsonTypeName("pcapng") +public class PcapngFormatConfig implements FormatPluginConfig { + + public List<String> extensions = Collections.singletonList("pcapng"); + + public List<String> getExtensions() { + return extensions; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PcapngFormatConfig that = (PcapngFormatConfig) o; + return Objects.equals(extensions, that.extensions); + } + + @Override + public int hashCode() { + return Objects.hash(extensions); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java new file mode 100644 index 0000000..832c0ec --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.RecordWriter; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasyWriter; +import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> { + + public static final String DEFAULT_NAME = "pcapng"; + + public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storagePluginConfig) { + this(name, context, fsConf, storagePluginConfig, new PcapngFormatConfig()); + } + + public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig formatPluginConfig) { + super(name, context, fsConf, config, formatPluginConfig, true, + false, true, false, + formatPluginConfig.getExtensions(), DEFAULT_NAME); + } + + @Override + public boolean supportsPushDown() { + return true; + } + + @Override + public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, + FileWork fileWork, List<SchemaPath> columns, + String userName) { + return new PcapngRecordReader(fileWork.getPath(), dfs, columns); + } + + @Override + public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) { + throw new UnsupportedOperationException("unimplemented"); + } + + @Override + public int getReaderOperatorType() { + return UserBitShared.CoreOperatorType.PCAPNG_SUB_SCAN_VALUE; + } + + @Override + public int getWriterOperatorType() { + throw new UnsupportedOperationException("unimplemented"); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java new file mode 100644 index 0000000..b1c5f24 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import fr.bmartel.pcapdecoder.PcapDecoder; +import fr.bmartel.pcapdecoder.structure.types.IPcapngType; +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.pcapng.schema.Column; +import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl; +import org.apache.drill.exec.store.pcapng.schema.DummyImpl; +import org.apache.drill.exec.store.pcapng.schema.Schema; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.function.BiConsumer; + +public class PcapngRecordReader extends AbstractRecordReader { + private static final Logger logger = LoggerFactory.getLogger(PcapngRecordReader.class); + + // batch size should not exceed max allowed record count + private static final int BATCH_SIZE = 40_000; + + private final Path pathToFile; + private OutputMutator output; + private List<ProjectedColumnInfo> projectedCols; + private FileSystem fs; + private FSDataInputStream in; + private List<SchemaPath> columns; + + private Iterator<IPcapngType> it; + + public PcapngRecordReader(final String pathToFile, + final FileSystem fileSystem, + final List<SchemaPath> columns) { + this.fs = fileSystem; + this.pathToFile = fs.makeQualified(new Path(pathToFile)); + this.columns = columns; + setColumns(columns); + } + + @Override + public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + try { + + this.output = output; + this.in = fs.open(pathToFile); + PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in)); + decoder.decode(); + this.it = decoder.getSectionList().iterator(); + setupProjection(); + } catch (IOException io) { + throw UserException.dataReadError(io) + .addContext("File name:", pathToFile.toUri().getPath()) + .build(logger); + } + } + + @Override + public int next() { + if (isSkipQuery()) { + return iterateOverBlocks((block, counter) -> { + }); + } else { + return iterateOverBlocks((block, counter) -> putToTable((IEnhancedPacketBLock) block, counter)); + } + } + + private void putToTable(IEnhancedPacketBLock bLock, Integer counter) { + for (ProjectedColumnInfo pci : projectedCols) { + pci.getColumn().process(bLock, pci.getVv(), counter); + } + } + + @Override + public void close() throws Exception { + if (in != null) { + in.close(); + in = null; + } + } + + private void setupProjection() { + if (isSkipQuery()) { + projectedCols = projectNone(); + } else if (isStarQuery()) { + projectedCols = projectAllCols(Schema.getColumnsNames()); + } else { + projectedCols = projectCols(columns); + } + } + + private List<ProjectedColumnInfo> projectNone() { + List<ProjectedColumnInfo> pciBuilder = new ArrayList<>(); + pciBuilder.add(makeColumn("dummy", new DummyImpl())); + return Collections.unmodifiableList(pciBuilder); + } + + private List<ProjectedColumnInfo> projectAllCols(final Set<String> columns) { + List<ProjectedColumnInfo> pciBuilder = new ArrayList<>(); + for (String colName : columns) { + pciBuilder.add(makeColumn(colName, Schema.getColumns().get(colName))); + } + return Collections.unmodifiableList(pciBuilder); + } + + private List<ProjectedColumnInfo> projectCols(final List<SchemaPath> columns) { + List<ProjectedColumnInfo> pciBuilder = new ArrayList<>(); + for (SchemaPath schemaPath : columns) { + String projectedName = schemaPath.rootName(); + if (schemaPath.isArray()) { + pciBuilder.add(makeColumn(projectedName, new DummyArrayImpl())); + } else if (Schema.getColumns().containsKey(projectedName.toLowerCase())) { + pciBuilder.add(makeColumn(projectedName, + Schema.getColumns().get(projectedName.toLowerCase()))); + } else { + pciBuilder.add(makeColumn(projectedName, new DummyImpl())); + } + } + return Collections.unmodifiableList(pciBuilder); + } + + private ProjectedColumnInfo makeColumn(final String colName, final Column column) { + MaterializedField field = MaterializedField.create(colName, column.getMinorType()); + ValueVector vector = getValueVector(field, output); + return new ProjectedColumnInfo(vector, column, colName); + } + + private ValueVector getValueVector(final MaterializedField field, final OutputMutator output) { + try { + TypeProtos.MajorType majorType = field.getType(); + final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( + majorType.getMinorType(), majorType.getMode()); + + return output.addField(field, clazz); + } catch (SchemaChangeException sce) { + throw UserException.internalError(sce) + .addContext("The addition of this field is incompatible with this OutputMutator's capabilities") + .build(logger); + } + } + + private Integer iterateOverBlocks(BiConsumer<IPcapngType, Integer> consumer) { + int counter = 0; + while (it.hasNext() && counter < BATCH_SIZE) { + IPcapngType block = it.next(); + if (block instanceof IEnhancedPacketBLock) { + consumer.accept(block, counter); + counter++; + } + } + return counter; + } + + private static class ProjectedColumnInfo { + + private ValueVector vv; + private Column colDef; + private String columnName; + + ProjectedColumnInfo(ValueVector vv, Column colDef, String columnName) { + this.vv = vv; + this.colDef = colDef; + this.columnName = columnName; + } + + public ValueVector getVv() { + return vv; + } + + Column getColumn() { + return colDef; + } + + public String getColumnName() { + return columnName; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java new file mode 100644 index 0000000..ea5d831 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.decoder; + +import org.apache.drill.exec.store.pcap.decoder.Packet; +import org.apache.drill.exec.store.pcap.decoder.PacketConstants; + +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte; +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort; + +public class PacketDecoder extends Packet { + + @SuppressWarnings("WeakerAccess") + public boolean readPcapng(final byte[] raw) { + this.raw = raw; + return decodeEtherPacket(); + } + + private boolean decodeEtherPacket() { + etherProtocol = getShort(raw, PacketConstants.PACKET_PROTOCOL_OFFSET); + ipOffset = PacketConstants.IP_OFFSET; + if (isIpV4Packet()) { + protocol = processIpV4Packet(); + return true; + } else if (isIpV6Packet()) { + int tmp = processIpV6Packet(); + if (tmp != -1) { + protocol = tmp; + } + return true; + } else if (isPPPoV6Packet()) { + protocol = getByte(raw, 48); + return true; + } + return false; + } + + @Override + protected int processIpV6Packet() { + try { + return super.processIpV6Packet(); + } catch (IllegalStateException ise) { + return -1; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java new file mode 100644 index 0000000..dafeaa3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * For comments on realization of this format plugin look at : + * + * @see <a href="https://issues.apache.org/jira/browse/DRILL-6179"> Jira</a> + */ +package org.apache.drill.exec.store.pcapng; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java new file mode 100644 index 0000000..109b7dd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.vector.ValueVector; + +public interface Column { + TypeProtos.MajorType getMinorType(); + + void process(IEnhancedPacketBLock block, ValueVector vv, int count); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java new file mode 100644 index 0000000..2023d19 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.ValueVector; + +public class DummyArrayImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.repeated(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java new file mode 100644 index 0000000..a8c26a0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.ValueVector; + +public class DummyImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java new file mode 100644 index 0000000..a9738bd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII; +import static org.apache.drill.exec.store.pcapng.schema.Util.setNullableLongColumnValue; + +public class Schema { + + private final static Map<String, Column> columns = new HashMap<>(); + + static { + columns.put("timestamp", new TimestampImpl()); + columns.put("packet_length", new PacketLenImpl()); + columns.put("type", new TypeImpl()); + columns.put("src_ip", new SrcIpImpl()); + columns.put("dst_ip", new DstIpImpl()); + columns.put("src_port", new SrcPortImpl()); + columns.put("dst_port", new DstPortImpl()); + columns.put("src_mac_address", new SrcMacImpl()); + columns.put("dst_mac_address", new DstMacImpl()); + columns.put("tcp_session", new TcpSessionImpl()); + columns.put("tcp_ack", new TcpAckImpl()); + columns.put("tcp_flags", new TcpFlags()); + columns.put("tcp_flags_ns", new TcpFlagsNsImpl()); + columns.put("tcp_flags_cwr", new TcpFlagsCwrImpl()); + columns.put("tcp_flags_ece", new TcpFlagsEceImpl()); + columns.put("tcp_flags_ece_ecn_capable", new TcpFlagsEceEcnCapableImpl()); + columns.put("tcp_flags_ece_congestion_experienced", new TcpFlagsEceCongestionExperiencedImpl()); + columns.put("tcp_flags_urg", new TcpFlagsUrgIml()); + columns.put("tcp_flags_ack", new TcpFlagsAckImpl()); + columns.put("tcp_flags_psh", new TcpFlagsPshImpl()); + columns.put("tcp_flags_rst", new TcpFlagsRstImpl()); + columns.put("tcp_flags_syn", new TcpFlagsSynImpl()); + columns.put("tcp_flags_fin", new TcpFlagsFinImpl()); + columns.put("tcp_parsed_flags", new TcpParsedFlags()); + columns.put("packet_data", new PacketDataImpl()); + } + + public static Map<String, Column> getColumns() { + return columns; + } + + public static Set<String> getColumnsNames() { + return columns.keySet(); + } + + static class TimestampImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.required(TypeProtos.MinorType.TIMESTAMP); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + Util.setTimestampColumnValue(block.getTimeStamp(), vv, count); + } + } + + static class PacketLenImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.required(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + Util.setIntegerColumnValue(block.getPacketLength(), vv, count); + } + } + + static class TypeImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getPacketType(), vv, count); + } + } + } + + static class SrcIpImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getSrc_ip().getHostAddress(), vv, count); + } + } + } + + static class DstIpImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getDst_ip().getHostAddress(), vv, count); + } + } + } + + static class SrcPortImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getSrc_port(), vv, count); + } + } + } + + static class DstPortImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getDst_port(), vv, count); + } + } + } + + static class SrcMacImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getEthernetSource(), vv, count); + } + } + } + + static class DstMacImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getEthernetDestination(), vv, count); + } + } + } + + static class TcpSessionImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.BIGINT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + setNullableLongColumnValue(packet.getSessionHash(), vv, count); + } + } + } + + static class TcpAckImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getAckNumber(), vv, count); + } + } + } + + static class TcpFlags implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getFlags(), vv, count); + } + } + } + + static class TcpFlagsNsImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x100) != 0, vv, count); + } + } + } + + static class TcpFlagsCwrImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x80) != 0, vv, count); + } + } + } + + static class TcpFlagsEceImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x40) != 0, vv, count); + } + } + } + + static class TcpFlagsEceEcnCapableImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, vv, count); + } + } + } + + static class TcpFlagsEceCongestionExperiencedImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, vv, count); + } + } + } + + static class TcpFlagsUrgIml implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x20) != 0, vv, count); + } + } + } + + static class TcpFlagsAckImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x10) != 0, vv, count); + } + } + } + + static class TcpFlagsPshImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x8) != 0, vv, count); + } + } + } + + static class TcpFlagsRstImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x4) != 0, vv, count); + } + } + } + + static class TcpFlagsSynImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x2) != 0, vv, count); + } + } + } + + static class TcpFlagsFinImpl implements Column { + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x1) != 0, vv, count); + } + } + } + + static class TcpParsedFlags implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getParsedFlags(), vv, count); + } + } + } + + static class PacketDataImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(parseBytesToASCII(block.getPacketData()), vv, count); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java new file mode 100644 index 0000000..06e8e6a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.TimeStampVector; +import org.apache.drill.exec.vector.ValueVector; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class Util { + static void setNullableIntegerColumnValue(final int data, final ValueVector vv, final int count) { + ((NullableIntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setIntegerColumnValue(final int data, final ValueVector vv, final int count) { + ((IntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setTimestampColumnValue(final long data, final ValueVector vv, final int count) { + ((TimeStampVector.Mutator) vv.getMutator()) + .setSafe(count, data / 1000); + } + + static void setNullableLongColumnValue(final long data, final ValueVector vv, final int count) { + ((NullableBigIntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setNullableStringColumnValue(final String data, final ValueVector vv, final int count) { + ((NullableVarCharVector.Mutator) vv.getMutator()) + .setSafe(count, data.getBytes(UTF_8), 0, data.length()); + } + + static void setNullableBooleanColumnValue(final boolean data, final ValueVector vv, final int count) { + ((NullableIntVector.Mutator) vv.getMutator()) + .setSafe(count, data ? 1 : 0); + } +} \ No newline at end of file diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json index 42cddd8..46f1620 100644 --- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json @@ -46,6 +46,9 @@ "pcap" : { type: "pcap" }, + "pcapng" : { + type: "pcapng" + }, "avro" : { type: "avro" }, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java index f51fe4c..e53c394 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java @@ -59,6 +59,7 @@ public class TestFormatPluginOptionExtractor { case "json": case "sequencefile": case "pcap": + case "pcapng": case "avro": assertEquals(d.typeName, "(type: String)", d.presentParams()); break; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java new file mode 100644 index 0000000..5dcffa9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Paths; + +public class TestPcapngHeaders extends ClusterTest { + @BeforeClass + public static void setupTestFiles() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1)); + dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng")); + } + + @Test + public void testValidHeadersForStarQuery() throws IOException { + String query = "select * from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("tcp_flags_ece_ecn_capable", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ece_congestion_experienced", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_psh", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("type", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_cwr", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_fin", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ece", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ack", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("src_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_syn", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_rst", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("packet_data", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_parsed_flags", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_ns", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("src_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("packet_length", Types.required(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_urg", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_ack", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeadersForProjection() throws IOException { + String query = "select sRc_ip, dst_IP, dst_mAc_address, src_Port, tcp_session, `Timestamp` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("sRc_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_IP", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_mAc_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_Port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("Timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeadersForMissColumns() throws IOException { + String query = "select `timestamp`, `name`, `color` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("name", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("color", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testMixColumns() throws IOException { + String query = "select src_ip, dst_ip, dst_mac_address, src_port, tcp_session, `timestamp` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("sRc_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_IP", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_mAc_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_Port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("Timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + String queryWithDiffOrder = "select `timestamp`, src_ip, dst_ip, src_port, tcp_session, dst_mac_address from dfs.`store/pcapng/sniff.pcapng`"; + actual = client.queryBuilder().sql(queryWithDiffOrder).rowSet(); + + expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("src_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("dst_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeaderForArrayColumns() throws IOException { + // query with non-existent field + String query = "select arr[3] as arr from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("arr", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + // query with an existent field which doesn't support arrays + query = "select type[45] as arr from dfs.`store/pcapng/sniff.pcapng`"; + + expectedSchema = new TupleSchema(); + actual = client.queryBuilder().sql(query).rowSet(); + + expectedSchema.add(MaterializedField.create("arr", Types.optional(TypeProtos.MinorType.INT))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeaderForNestedColumns() throws IOException { + // query with non-existent field + String query = "select top['nested'] as nested from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("nested", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + // query with an existent field which doesn't support nesting + query = "select type['nested'] as nested from dfs.`store/pcapng/sniff.pcapng`"; + + expectedSchema = new TupleSchema(); + actual = client.queryBuilder().sql(query).rowSet(); + + expectedSchema.add(MaterializedField.create("nested", Types.optional(TypeProtos.MinorType.INT))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java new file mode 100644 index 0000000..98d7b67 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Paths; + +public class TestPcapngRecordReader extends PlanTestBase { + @BeforeClass + public static void setupTestFiles() { + dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng")); + } + + @Test + public void testStarQuery() throws Exception { + Assert.assertEquals(123, testSql("select * from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select * from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testProjectingByName() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, packet_data, type from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, dst_ip, `timestamp` from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testDiffCaseQuery() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, paCket_dAta, TyPe from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, dst_ip, `Timestamp` from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testProjectingMissColls() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, `name`, `color` from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, `time` from dfs.`store/pcapng/example.pcapng`")); + } + + + @Test + public void testCountQuery() throws Exception { + testBuilder() + .sqlQuery("select count(*) as ct from dfs.`store/pcapng/sniff.pcapng`") + .ordered() + .baselineColumns("ct") + .baselineValues(123L) + .build() + .run(); + + testBuilder() + .sqlQuery("select count(*) as ct from dfs.`store/pcapng/example.pcapng`") + .ordered() + .baselineColumns("ct") + .baselineValues(1L) + .build() + .run(); + } + + @Test + public void testGroupBy() throws Exception { + Assert.assertEquals(47, testSql("select src_ip, count(1), sum(packet_length) from dfs.`store/pcapng/sniff.pcapng` group by src_ip")); + } + + @Test + public void testDistinctQuery() throws Exception { + Assert.assertEquals(119, testSql("select distinct `timestamp`, src_ip from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select distinct packet_data from dfs.`store/pcapng/example.pcapng`")); + } + + @Test(expected = UserRemoteException.class) + public void testBasicQueryWithIncorrectFileName() throws Exception { + testSql("select * from dfs.`store/pcapng/snaff.pcapng`"); + } + + @Test + public void testPhysicalPlanExecutionBasedOnQuery() throws Exception { + String query = "EXPLAIN PLAN for select * from dfs.`store/pcapng/sniff.pcapng`"; + String plan = getPlanInString(query, JSON_FORMAT); + Assert.assertEquals(123, testPhysical(plan)); + } +} diff --git a/exec/java-exec/src/test/resources/store/pcapng/example.pcapng b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng new file mode 100644 index 0000000..002cb8d Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng differ diff --git a/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng new file mode 100644 index 0000000..cd542bd Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng differ diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 77bf211..c9e6dc2 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -585,6 +585,10 @@ public final class UserBitShared { * <code>PARTITION_LIMIT = 54;</code> */ PARTITION_LIMIT(54, 54), + /** + * <code>PCAPNG_SUB_SCAN = 55;</code> + */ + PCAPNG_SUB_SCAN(55, 55), ; /** @@ -807,6 +811,10 @@ public final class UserBitShared { * <code>PARTITION_LIMIT = 54;</code> */ public static final int PARTITION_LIMIT_VALUE = 54; + /** + * <code>PCAPNG_SUB_SCAN = 55;</code> + */ + public static final int PCAPNG_SUB_SCAN_VALUE = 55; public final int getNumber() { return value; } @@ -868,6 +876,7 @@ public final class UserBitShared { case 52: return IMAGE_SUB_SCAN; case 53: return SEQUENCE_SUB_SCAN; case 54: return PARTITION_LIMIT; + case 55: return PCAPNG_SUB_SCAN; default: return null; } } @@ -24404,7 +24413,7 @@ public final class UserBitShared { "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" + "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022" + "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005" + - "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe" + + "\022\032\n\026CANCELLATION_REQUESTED\020\006*\343\010\n\020CoreOpe" + "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" + "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" + "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS" + @@ -24432,11 +24441,11 @@ public final class UserBitShared { "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" + "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S", "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" + - "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN" + - "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES" + - "S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B." + - "\n\033org.apache.drill.exec.protoB\rUserBitSh" + - "aredH\001" + "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207*g\n\nSa" + + "slStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START" + + "\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS" + + "\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill." + + "exec.protoB\rUserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 38ac50e..2d7d492 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -76,7 +76,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO HTPPD_LOG_SUB_SCAN(51), IMAGE_SUB_SCAN(52), SEQUENCE_SUB_SCAN(53), - PARTITION_LIMIT(54); + PARTITION_LIMIT(54), + PCAPNG_SUB_SCAN(55); public final int number; @@ -149,6 +150,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO case 52: return IMAGE_SUB_SCAN; case 53: return SEQUENCE_SUB_SCAN; case 54: return PARTITION_LIMIT; + case 55: return PCAPNG_SUB_SCAN; default: return null; } } diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 65ebe0b..62802f6 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -343,6 +343,7 @@ enum CoreOperatorType { IMAGE_SUB_SCAN = 52; SEQUENCE_SUB_SCAN = 53; PARTITION_LIMIT = 54; + PCAPNG_SUB_SCAN = 55; } /* Registry that contains list of jars, each jar contains its name and list of function signatures.