METRON-1728: Handle null values in config in Pcap backend more gracefully (mmiklavc via mmiklavc) closes apache/metron#1151
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9064cca0 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9064cca0 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9064cca0 Branch: refs/heads/master Commit: 9064cca0317881176471c51abd16e99bf2ad7b10 Parents: 14dcb2d Author: mmiklavc <michael.miklav...@gmail.com> Authored: Thu Aug 9 09:25:29 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Thu Aug 9 09:25:29 2018 -0600 ---------------------------------------------------------------------- .../common/configuration/ConfigOption.java | 32 ++++-- .../common/configuration/ConfigOptionTest.java | 112 +++++++++++++++++++ .../org/apache/metron/pcap/query/CliParser.java | 25 +++-- .../org/apache/metron/pcap/PcapJobTest.java | 23 ++++ .../apache/metron/pcap/query/PcapCliTest.java | 10 +- .../metron/pcap/config/PcapGlobalDefaults.java | 28 +++++ .../metron/pcap/finalizer/PcapFinalizer.java | 8 +- .../pcap/finalizer/PcapRestFinalizer.java | 11 +- .../java/org/apache/metron/pcap/mr/PcapJob.java | 25 +++-- 9 files changed, 237 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java index 8e4211b..6308f0a 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java @@ -18,36 +18,54 @@ package org.apache.metron.common.configuration; -import org.apache.metron.stellar.common.utils.ConversionUtils; - import java.util.Map; import java.util.function.BiFunction; +import org.apache.metron.stellar.common.utils.ConversionUtils; public interface ConfigOption { + String getKey(); + default BiFunction<String, Object, Object> transform() { - return (s,o) -> o; + return (s, o) -> o; } default void put(Map<String, Object> map, Object value) { map.put(getKey(), value); } + default <T> T getOrDefault(Map<String, Object> map, Class<T> clazz, T defaultValue) { + T val; + return ((val = get(map, clazz)) == null ? defaultValue : val); + } + default <T> T get(Map<String, Object> map, Class<T> clazz) { Object obj = map.get(getKey()); - if(clazz.isInstance(obj)) { + if (clazz.isInstance(obj)) { return clazz.cast(obj); - } - else { + } else { return ConversionUtils.convert(obj, clazz); } } - default <T> T get(Map<String, Object> map, BiFunction<String, Object, T> transform, Class<T> clazz) { + default <T> T getOrDefault(Map<String, Object> map, BiFunction<String, Object, T> transform, + Class<T> clazz, T defaultValue) { + T val; + return ((val = get(map, transform, clazz)) == null ? defaultValue : val); + } + + default <T> T get(Map<String, Object> map, BiFunction<String, Object, T> transform, + Class<T> clazz) { return clazz.cast(transform.apply(getKey(), map.get(getKey()))); } + default <T> T getTransformedOrDefault(Map<String, Object> map, Class<T> clazz, T defaultValue) { + T val; + return ((val = getTransformed(map, clazz)) == null ? defaultValue : val); + } + default <T> T getTransformed(Map<String, Object> map, Class<T> clazz) { return clazz.cast(transform().apply(getKey(), map.get(getKey()))); } + } http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java new file mode 100644 index 0000000..95db080 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.configuration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the default interface methods + */ +public class ConfigOptionTest { + + @Before + public void setup() { + } + + @Test + public void gets_value_of_specified_type() { + ConfigOption option = newOption("foo"); + Map<String, Object> config = new HashMap<>(); + option.put(config, 25L); + assertThat(option.get(config, Long.class), equalTo(25L)); + assertThat(option.get(mapWith("foo", 25L), Long.class), equalTo(25L)); + } + + @Test + public void gets_value_of_specified_type_with_transform() { + ConfigOption option = newOption("foo"); + Map<String, Object> config = new HashMap<>(); + option.put(config, "25"); + BiFunction<String, Object, Long> transform = (s, o) -> o == null ? null + : new Long(o.toString()); + assertThat(option.get(config, transform, Long.class), equalTo(25L)); + assertThat(option.get(mapWith("foo", "25"), transform, Long.class), equalTo(25L)); + } + + @Test + public void gets_default_value_of_specified_type_with_transform() { + ConfigOption option = newOption("foo"); + Map<String, Object> config = new HashMap<>(); + option.put(config, null); + BiFunction<String, Object, Long> transform = (s, o) -> o == null ? null + : new Long(o.toString()); + assertThat(option.getOrDefault(config, transform, Long.class, 25L), equalTo(25L)); + assertThat(option.getOrDefault(mapWith("foo", null), transform, Long.class, 25L), equalTo(25L)); + } + + @Test + public void gets_default_when_null_value() { + ConfigOption option = newOption("foo"); + Map<String, Object> config = new HashMap<>(); + option.put(config, null); + assertThat(option.getOrDefault(config, Long.class, 0L), equalTo(0L)); + assertThat(option.getOrDefault(mapWith("foo", null), Long.class, 0L), equalTo(0L)); + } + + @Test + public void gets_object_transformed_by_class_cast() { + ConfigOption option = newOption("foo"); + Map<String, Object> config = new HashMap<>(); + option.put(config, (Object) 25L); + assertThat(option.getTransformed(config, Long.class), equalTo(25L)); + assertThat(option.getTransformed(mapWith("foo", (Object) 25L), Long.class), equalTo(25L)); + } + + @Test + public void gets_default_null_with_cast_when_null() { + ConfigOption option = newOption("foo"); + Map<String, Object> config = new HashMap<>(); + option.put(config, null); + assertThat(option.getTransformedOrDefault(config, Long.class, 25L), equalTo(25L)); + assertThat(option.getTransformedOrDefault(mapWith("foo", null), Long.class, 25L), equalTo(25L)); + } + + private <K, V> Map<K, V> mapWith(K key, V val) { + Map<K, V> map = new HashMap<>(); + map.put(key, val); + return map; + } + + private ConfigOption newOption(final String key) { + return new ConfigOption() { + @Override + public String getKey() { + return key; + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java index 4ad6ffa..2d15e8b 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java @@ -18,17 +18,23 @@ package org.apache.metron.pcap.query; -import org.apache.commons.cli.*; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.metron.pcap.config.PcapConfig; /** * Provides commmon required fields for the PCAP filter jobs */ public class CliParser { - public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap/input"; - public static final String BASE_INTERIM_OUTPUT_PATH_DEFAULT = "/tmp"; - public static final int NUM_REDUCERS_DEFAULT = 10; - public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000; private CommandLineParser parser; protected PcapConfig.PrefixStrategy prefixStrategy; @@ -40,9 +46,10 @@ public class CliParser { public Options buildOptions() { Options options = new Options(); options.addOption(newOption("h", "help", false, "Display help")); - options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT))); + options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", + BASE_INPUT_PATH_DEFAULT))); options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", - BASE_INTERIM_OUTPUT_PATH_DEFAULT))); + BASE_INTERIM_RESULT_PATH_DEFAULT))); options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true)); options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT))); options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT))); @@ -71,12 +78,12 @@ public class CliParser { if (commandLine.hasOption("base_path")) { config.setBasePath(commandLine.getOptionValue("base_path")); } else { - config.setBasePath(BASE_PATH_DEFAULT); + config.setBasePath(BASE_INPUT_PATH_DEFAULT); } if (commandLine.hasOption("base_output_path")) { config.setBaseInterimResultPath(commandLine.getOptionValue("base_output_path")); } else { - config.setBaseInterimResultPath(BASE_INTERIM_OUTPUT_PATH_DEFAULT); + config.setBaseInterimResultPath(BASE_INTERIM_RESULT_PATH_DEFAULT); } if (commandLine.hasOption("start_time")) { try { http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index 14963fd..796c8a5 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -264,4 +264,27 @@ public class PcapJobTest { Assert.assertThat(status.getState(), equalTo(State.KILLED)); } + @Test + public void handles_null_values_with_defaults() throws Exception { + PcapOptions.START_TIME_NS.put(config, null); + PcapOptions.END_TIME_NS.put(config, null); + PcapOptions.NUM_REDUCERS.put(config, null); + PcapOptions.NUM_RECORDS_PER_FILE.put(config, null); + + pageableResult = new PcapPages( + Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt"))); + when(finalizer.finalizeJob(any())).thenReturn(pageableResult); + when(mrJob.isComplete()).thenReturn(true); + when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); + when(mrJob.getStatus()).thenReturn(mrStatus); + Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); + Pageable<Path> results = statusable.get(); + Assert.assertThat(results.getSize(), equalTo(3)); + JobStatus status = statusable.getStatus(); + Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); + Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); + Assert.assertThat(status.getJobId(), equalTo(jobIdVal)); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java index c7d6fdf..96ca354 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java @@ -17,6 +17,8 @@ */ package org.apache.metron.pcap.query; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.argThat; @@ -91,8 +93,8 @@ public class PcapCliTest { put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`"); }}; FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); - PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT); - PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT); + PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT); PcapOptions.FIELDS.put(config, query); PcapOptions.NUM_REDUCERS.put(config, 10); PcapOptions.START_TIME_MS.put(config, 500L); @@ -237,8 +239,8 @@ public class PcapCliTest { String query = "some query string"; FixedPcapConfig config = new FixedPcapConfig(prefixStrategy); - PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT); - PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT); + PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT); + PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT); PcapOptions.FIELDS.put(config, query); PcapOptions.NUM_REDUCERS.put(config, 10); PcapOptions.START_TIME_MS.put(config, 500L); http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java new file mode 100644 index 0000000..b8c674c --- /dev/null +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.pcap.config; + +public class PcapGlobalDefaults { + public static final String BASE_PCAP_PATH_DEFAULT = "/apps/metron/pcap"; + public static final String BASE_INPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/input"; + public static final String BASE_INTERIM_RESULT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/interim"; + public static final String FINAL_OUTPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/output"; + public static final int NUM_REDUCERS_DEFAULT = 10; + public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000; +} http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java index 8dcc401..5a61f9b 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java @@ -18,6 +18,8 @@ package org.apache.metron.pcap.finalizer; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT; + import com.google.common.collect.Iterables; import java.io.IOException; import java.lang.invoke.MethodHandles; @@ -62,9 +64,9 @@ public abstract class PcapFinalizer implements Finalizer<Path> { @Override public Pageable<Path> finalizeJob(Map<String, Object> config) throws JobException { Configuration hadoopConfig = PcapOptions.HADOOP_CONF.get(config, Configuration.class); - int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(config, Integer.class); - Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH - .get(config, PcapOptions.STRING_TO_PATH, Path.class); + int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE + .getOrDefault(config, Integer.class, NUM_RECORDS_PER_FILE_DEFAULT); + Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class); FileSystem fs = PcapOptions.FILESYSTEM.get(config, FileSystem.class); SequenceFileIterable interimResults = null; http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java index 93a3222..13fa795 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java @@ -18,14 +18,15 @@ package org.apache.metron.pcap.finalizer; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.FINAL_OUTPUT_PATH_DEFAULT; + import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.metron.job.Statusable; import org.apache.metron.pcap.config.PcapOptions; - -import java.util.Map; import org.apache.metron.pcap.writer.PcapResultsWriter; /** @@ -45,10 +46,12 @@ public class PcapRestFinalizer extends PcapFinalizer { @Override protected Path getOutputPath(Map<String, Object> config, int partition) { - String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, String.class); + String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH + .getOrDefault(config, String.class, FINAL_OUTPUT_PATH_DEFAULT); String user = PcapOptions.USERNAME.get(config, String.class); String jobId = PcapOptions.JOB_ID.get(config, String.class); - return new Path(String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition)); + return new Path( + String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition)); } } http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index ea2aa29..23bd510 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -20,6 +20,7 @@ package org.apache.metron.pcap.mr; import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo; import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo; +import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT; import com.google.common.base.Joiner; import java.io.IOException; @@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.metron.common.utils.timestamp.TimestampConverters; import org.apache.metron.job.Finalizer; import org.apache.metron.job.JobException; import org.apache.metron.job.JobStatus; @@ -60,6 +62,7 @@ import org.apache.metron.job.Statusable; import org.apache.metron.pcap.PacketInfo; import org.apache.metron.pcap.PcapHelper; import org.apache.metron.pcap.PcapPages; +import org.apache.metron.pcap.config.PcapGlobalDefaults; import org.apache.metron.pcap.config.PcapOptions; import org.apache.metron.pcap.filter.PcapFilter; import org.apache.metron.pcap.filter.PcapFilterConfigurator; @@ -216,20 +219,22 @@ public class PcapJob<T> implements Statusable<Path> { Configuration hadoopConf = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class); FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class); Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, Path.class); - Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.getTransformed(configuration, Path.class); - long startTime; + Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH + .getTransformedOrDefault(configuration, Path.class, + new Path(PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT)); + long startTimeNs; if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) { - startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class); + startTimeNs = PcapOptions.START_TIME_NS.getOrDefault(configuration, Long.class, 0L); } else { - startTime = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000; + startTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.START_TIME_MS.getOrDefault(configuration, Long.class, 0L)); } - long endTime; + long endTimeNs; if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) { - endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class); + endTimeNs = PcapOptions.END_TIME_NS.getOrDefault(configuration, Long.class, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis())); } else { - endTime = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000; + endTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.END_TIME_MS.getOrDefault(configuration, Long.class, System.currentTimeMillis())); } - int numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class); + int numReducers = PcapOptions.NUM_REDUCERS.getOrDefault(configuration, Integer.class, NUM_REDUCERS_DEFAULT); T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class); PcapFilterConfigurator<T> filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class); @@ -237,8 +242,8 @@ public class PcapJob<T> implements Statusable<Path> { Statusable<Path> statusable = query(jobName, basePath, baseInterimResultPath, - startTime, - endTime, + startTimeNs, + endTimeNs, numReducers, fields, // create a new copy for each job, bad things happen when hadoop config is reused