http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java new file mode 100644 index 0000000..7042e86 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile.importer; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.utils.SerDeUtils; +import org.apache.metron.dataloads.extractor.ExtractorCapabilities; +import org.apache.metron.dataloads.extractor.ExtractorHandler; +import org.apache.metron.dataloads.extractor.StatefulExtractor; +import org.apache.metron.dataloads.nonbulk.flatfile.SummarizeOptions; +import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput; +import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writer; +import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writers; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +public class LocalSummarizer extends AbstractLocalImporter<SummarizeOptions, LocalSummarizer.SummarizationState> { + List<SummarizationState> stateList; + + public LocalSummarizer() { + stateList = Collections.synchronizedList(new ArrayList<>()); + } + + public static class SummarizationState { + AtomicReference<Object> state; + StatefulExtractor extractor; + public SummarizationState(StatefulExtractor extractor, Object initState) { + this.state = new AtomicReference<>(initState); + this.extractor = extractor; + } + + public AtomicReference<Object> getState() { + return state; + } + + public StatefulExtractor getExtractor() { + return extractor; + } + + } + + @Override + protected boolean isQuiet(EnumMap<SummarizeOptions, Optional<Object>> config) { + return (boolean) config.getOrDefault(SummarizeOptions.QUIET, Optional.of(false)).get(); + } + + @Override + protected int batchSize(EnumMap<SummarizeOptions, Optional<Object>> config) { + return (int) config.getOrDefault(SummarizeOptions.BATCH_SIZE, Optional.of(1)).get(); + } + + @Override + protected int numThreads(EnumMap<SummarizeOptions, Optional<Object>> config, ExtractorHandler handler) { + if(handler.getExtractor().getCapabilities().contains(ExtractorCapabilities.MERGEABLE)) { + return (int) config.get(SummarizeOptions.NUM_THREADS).get(); + } + else { + //force one thread in the case it's not mergeable. + return 1; + } + } + + @Override + protected void validateState(EnumMap<SummarizeOptions, Optional<Object>> config, ExtractorHandler handler) { + if(!(handler.getExtractor() instanceof StatefulExtractor)){ + throw new IllegalStateException("Extractor must be a stateful extractor and " + handler.getExtractor().getClass().getName() + " is not."); + } + assertOption(config, SummarizeOptions.OUTPUT); + if(!handler.getExtractor().getCapabilities().contains(ExtractorCapabilities.STATEFUL)) { + throw new IllegalStateException("Unable to operate on a non-stateful extractor. " + + "If you have not specified \"stateUpdate\" in your Extractor config, there is nothing to do here and nothing will be written."); + } + + } + + @Override + protected ThreadLocal<SummarizationState> createState(EnumMap<SummarizeOptions, Optional<Object>> config, Configuration hadoopConfig, ExtractorHandler handler) { + final StatefulExtractor extractor = (StatefulExtractor)handler.getExtractor(); + return ThreadLocal.withInitial(() -> { + Object initState = extractor.initializeState(handler.getConfig()); + SummarizationState ret = new SummarizationState(extractor, initState); + stateList.add(ret); + return ret; + }); + } + + + @Override + protected void extract(SummarizationState state, String line) throws IOException { + state.getExtractor().extract(line, state.getState()); + } + + @Override + public void importData(EnumMap<SummarizeOptions, Optional<Object>> config, ExtractorHandler handler, Configuration hadoopConfig) throws IOException, InvalidWriterOutput { + Writer writer = (Writer) config.get(SummarizeOptions.OUTPUT_MODE).get(); + Optional<String> fileName = Optional.ofNullable((String)config.get(SummarizeOptions.OUTPUT).orElse(null)); + writer.validate(fileName, hadoopConfig); + super.importData(config, handler, hadoopConfig); + StatefulExtractor extractor = (StatefulExtractor) handler.getExtractor(); + Object finalState = null; + if(stateList.size() == 1) { + finalState = stateList.get(0).getState().get(); + } + else if(stateList.size() > 1) { + List<Object> states = new ArrayList<>(); + for(SummarizationState s : stateList) { + states.add(s.getState().get()); + } + finalState = extractor.mergeStates(states); + } + writer.write(finalState, fileName, hadoopConfig); + } + + @Override + protected List<String> getInputs(EnumMap<SummarizeOptions, Optional<Object>> config) { + Object o = config.get(SummarizeOptions.INPUT).get(); + if(o == null) { + return new ArrayList<>(); + } + if(o instanceof String) { + return ImmutableList.of((String)o); + } + return (List<String>) config.get(SummarizeOptions.INPUT).get(); + } +}
http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java index 401ace2..1b34ed4 100644 --- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java @@ -38,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public enum MapReduceImporter implements Importer{ +public enum MapReduceImporter implements Importer<LoadOptions> { INSTANCE ; http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java new file mode 100644 index 0000000..180aa23 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile.importer; + +import java.util.Optional; + +public enum Summarizers { + LOCAL(new LocalSummarizer()); + + private Importer importer; + + Summarizers(Importer importer) { + this.importer = importer; + } + + public Importer getSummarizer() { + return importer; + } + + public static Optional<Summarizers> getStrategy(String strategyName) { + if(strategyName == null) { + return Optional.empty(); + } + for(Summarizers strategy : values()) { + if(strategy.name().equalsIgnoreCase(strategyName.trim())) { + return Optional.of(strategy); + } + } + return Optional.empty(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java new file mode 100644 index 0000000..22f4aa1 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile.writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.utils.SerDeUtils; + +import java.io.IOException; +import java.util.Optional; + +public class ConsoleWriter implements Writer{ + @Override + public void validate(Optional<String> output, Configuration hadoopConfig) { + + } + + @Override + public void write(Object obj, Optional<String> output, Configuration hadoopConfig) throws IOException { + System.out.println(obj); + } + + @Override + public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException { + System.out.println(SerDeUtils.fromBytes(obj, Object.class)); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java new file mode 100644 index 0000000..1c0c726 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile.writer; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Optional; + +public class HDFSWriter implements Writer { + @Override + public void validate(Optional<String> fileNameOptional, Configuration hadoopConfig) throws InvalidWriterOutput { + if(!fileNameOptional.isPresent()) { + throw new InvalidWriterOutput("Filename is not present."); + } + String fileName = fileNameOptional.get(); + if(StringUtils.isEmpty(fileName) || fileName.trim().equals(".") || fileName.trim().equals("..") || fileName.trim().endsWith("/")) { + throw new InvalidWriterOutput("Filename is empty or otherwise invalid."); + } + } + + @Override + public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException { + FileSystem fs = FileSystem.get(hadoopConfig); + try(FSDataOutputStream stream = fs.create(new Path(output.get()))) { + IOUtils.write(obj, stream); + stream.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java new file mode 100644 index 0000000..7c237c8 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile.writer; + +public class InvalidWriterOutput extends Exception { + public InvalidWriterOutput(String message) { + super(message); + } + + public InvalidWriterOutput(String message, Throwable t) { + super(message, t); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java new file mode 100644 index 0000000..d8bda81 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile.writer; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Optional; + +public class LocalWriter implements Writer { + + @Override + public void validate(Optional<String> fileNameOptional, Configuration hadoopConfig) throws InvalidWriterOutput { + if(!fileNameOptional.isPresent()) { + throw new InvalidWriterOutput("Filename is not present."); + } + String fileName = fileNameOptional.get(); + if(StringUtils.isEmpty(fileName) || fileName.trim().equals(".") || fileName.trim().equals("..") || fileName.trim().endsWith("/")) { + throw new InvalidWriterOutput("Filename is empty or otherwise invalid."); + } + } + + @Override + public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException { + File outFile = new File(output.get()); + if(!outFile.getParentFile().exists()) { + outFile.getParentFile().mkdirs(); + } + try(FileOutputStream fs = new FileOutputStream(outFile)) { + IOUtils.write(obj, fs); + fs.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java new file mode 100644 index 0000000..ba13ba1 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile.writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.metron.common.utils.SerDeUtils; + +import java.io.IOException; +import java.util.Optional; + +public interface Writer { + void validate(Optional<String> output, Configuration hadoopConfig) throws InvalidWriterOutput; + default void write(Object obj, Optional<String> output, Configuration hadoopConfig) throws IOException { + if(obj != null) { + write(SerDeUtils.toBytes(obj), output, hadoopConfig); + } + } + void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException; +} http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java new file mode 100644 index 0000000..785ad21 --- /dev/null +++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile.writer; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.Optional; + +public enum Writers implements Writer { + LOCAL(new LocalWriter()), + HDFS(new HDFSWriter()), + CONSOLE(new ConsoleWriter()) + ; + private Writer writer; + + Writers(Writer writer) { + this.writer = writer; + } + public static Optional<Writers> getStrategy(String strategyName) { + if(strategyName == null) { + return Optional.empty(); + } + for(Writers strategy : values()) { + if(strategy.name().equalsIgnoreCase(strategyName.trim())) { + return Optional.of(strategy); + } + } + return Optional.empty(); + } + + @Override + public void validate(Optional<String> output, Configuration hadoopConf) throws InvalidWriterOutput { + writer.validate(output, hadoopConf); + } + + @Override + public void write(byte[] obj, Optional<String> output, Configuration hadoopConf) throws IOException { + writer.write(obj, output, hadoopConf); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh b/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh new file mode 100755 index 0000000..018d61a --- /dev/null +++ b/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default} +[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase + +# Autodetect JAVA_HOME if not defined +if [ -e /usr/libexec/bigtop-detect-javahome ]; then + . /usr/libexec/bigtop-detect-javahome +elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then + . /usr/lib/bigtop-utils/bigtop-detect-javahome +fi + +export METRON_VERSION=${project.version} +export METRON_HOME=/usr/metron/$METRON_VERSION +export CLASSNAME="org.apache.metron.dataloads.nonbulk.flatfile.SimpleFlatFileSummarizer" +export DM_JAR=${project.artifactId}-$METRON_VERSION.jar +export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client} +export HADOOP_OPTS="$HADOOP_OPTS $METRON_JVMFLAGS" +if [ $(which hadoop) ] +then + HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath` + for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do + if [ -f $jar ];then + LIBJARS="$jar,$LIBJARS" + fi + done + export HADOOP_CLASSPATH + hadoop jar $METRON_HOME/lib/$DM_JAR $CLASSNAME -libjars ${LIBJARS} "$@" +else + echo "Warning: Metron cannot find the hadoop client on this node. This means that loading via Map Reduce will NOT function." + CP=$METRON_HOME/lib/$DM_JAR:/usr/metron/${METRON_VERSION}/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath` + java $METRON_JVMFLAGS -cp $CP $CLASSNAME "$@" +fi + http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java new file mode 100644 index 0000000..17e3206 --- /dev/null +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.dataloads.nonbulk.flatfile; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.metron.dataloads.extractor.ExtractorHandler; +import org.apache.metron.dataloads.nonbulk.flatfile.importer.LocalSummarizer; +import org.apache.metron.dataloads.nonbulk.flatfile.location.Location; +import org.apache.metron.dataloads.nonbulk.flatfile.location.RawLocation; +import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput; +import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writer; +import org.apache.metron.stellar.common.utils.StellarProcessorUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +public class SimpleFlatFileSummarizerTest { + /** + { + "config" : { + "columns" : { + "rank" : 0, + "domain" : 1 + }, + "value_transform" : { + "domain" : "DOMAIN_REMOVE_TLD(domain)" + }, + "value_filter" : "LENGTH(domain) > 0", + "state_init" : "MULTISET_INIT()", + "state_update" : { + "state" : "MULTISET_ADD(state, domain)" + }, + "state_merge" : "MULTISET_MERGE(states)", + "separator" : "," + }, + "extractor" : "CSV" + } + */ + @Multiline + public static String stellarExtractorConfigLineByLine; + + /** + { + "config" : { + "columns" : { + "rank" : 0, + "domain" : 1 + }, + "value_transform" : { + "domain" : "DOMAIN_REMOVE_TLD(domain)" + }, + "value_filter" : "LENGTH(domain) > 0", + "state_init" : "MULTISET_INIT()", + "state_update" : { + "state" : "MULTISET_ADD(state, domain)" + }, + "state_merge" : "MULTISET_MERGE(states)", + "separator" : "," + }, + "extractor" : "CSV", + "inputFormat" : "WHOLE_FILE" + } + */ + @Multiline + public static String stellarExtractorConfigWholeFile; + + + public static List<String> domains = ImmutableList.of( + "google.com", + "youtube.com", + "facebook.com", + "baidu.com", + "wikipedia.org", + "yahoo.com", + "google.co.in", + "reddit.com", + "qq.com", + "amazon.com", + "taobao.com", + "tmall.com", + "twitter.com", + "live.com", + "vk.com", + "google.co.jp", + "instagram.com", + "sohu.com", + "sina.com.cn", + "jd.com" + ); + + public static String generateData() { + List<String> tmp = new ArrayList<>(); + int i = 1; + for(String d : domains) { + tmp.add(i + "," + d); + } + return Joiner.on("\n").join(tmp); + } + + @Test + public void testArgs() throws Exception { + String[] argv = { "-e extractor.json" + , "-o out.ser" + , "-l log4j", "-i input.csv" + , "-p 2", "-b 128", "-q" + }; + + Configuration config = new Configuration(); + String[] otherArgs = new GenericOptionsParser(config, argv).getRemainingArgs(); + + CommandLine cli = SummarizeOptions.parse(new PosixParser(), otherArgs); + Assert.assertEquals("extractor.json", SummarizeOptions.EXTRACTOR_CONFIG.get(cli).trim()); + Assert.assertEquals("input.csv", SummarizeOptions.INPUT.get(cli).trim()); + Assert.assertEquals("log4j", SummarizeOptions.LOG4J_PROPERTIES.get(cli).trim()); + Assert.assertEquals("2", SummarizeOptions.NUM_THREADS.get(cli).trim()); + Assert.assertEquals("128", SummarizeOptions.BATCH_SIZE.get(cli).trim()); + } + + public static class InMemoryLocation implements RawLocation { + Map<String, String> inMemoryData; + public InMemoryLocation(Map<String, String> inMemoryData) + { + this.inMemoryData = inMemoryData; + } + + @Override + public Optional<List<String>> list(String loc) throws IOException { + if(loc.equals(".")) { + ArrayList<String> ret = new ArrayList<>(inMemoryData.keySet()); + return Optional.of(ret); + } + return Optional.empty(); + } + + @Override + public boolean exists(String loc) { + return loc.equals(".") ? true:inMemoryData.containsKey(loc); + } + + @Override + public boolean isDirectory(String loc) throws IOException { + return loc.equals(".")?true:false; + } + + @Override + public InputStream openInputStream(String loc) throws IOException { + return new ByteArrayInputStream(inMemoryData.get(loc).getBytes()); + } + + @Override + public boolean match(String loc) { + return exists(loc); + } + } + + public class MockSummarizer extends LocalSummarizer { + Map<String, String> mockData; + public MockSummarizer(Map<String, String> mockData) { + this.mockData = mockData; + } + + @Override + protected List<Location> getLocationsRecursive(List<String> inputs, FileSystem fs) throws IOException { + Set<Location> ret = new HashSet<>(); + for(String input : inputs) { + if(input.equals(".")) { + for(String s : mockData.keySet()) { + ret.add(resolveLocation(s, fs)); + } + } + else { + ret.add(resolveLocation(input, fs)); + } + } + return new ArrayList<>(ret); + } + + @Override + protected Location resolveLocation(String input, FileSystem fs) { + return new Location(input, new InMemoryLocation(mockData)); + } + } + + public static class PeekingWriter implements Writer { + AtomicReference<Object> ref; + public PeekingWriter(AtomicReference<Object> ref) { + this.ref = ref; + } + + @Override + public void validate(Optional<String> output, Configuration hadoopConfig) { + + } + @Override + public void write(Object obj, Optional<String> output, Configuration hadoopConfig) throws IOException { + ref.set(obj); + } + + @Override + public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException { + + } + } + + @Test + public void testLineByLine() throws IOException, InvalidWriterOutput { + testLineByLine(5); + testLineByLine(1); + } + + public void testLineByLine(final int numThreads) throws IOException, InvalidWriterOutput { + ExtractorHandler handler = ExtractorHandler.load(stellarExtractorConfigLineByLine); + LocalSummarizer summarizer = new MockSummarizer( + ImmutableMap.of("input.csv", generateData()) + ); + final AtomicReference<Object> finalObj = new AtomicReference<>(null); + EnumMap<SummarizeOptions, Optional<Object>> options = new EnumMap<SummarizeOptions, Optional<Object>>(SummarizeOptions.class) {{ + put(SummarizeOptions.INPUT, Optional.of("input.csv")); + put(SummarizeOptions.BATCH_SIZE, Optional.of(5)); + put(SummarizeOptions.QUIET, Optional.of(true)); + put(SummarizeOptions.OUTPUT_MODE, Optional.of(new PeekingWriter(finalObj))); + put(SummarizeOptions.OUTPUT, Optional.of("out")); + put(SummarizeOptions.NUM_THREADS, Optional.of(numThreads)); + }}; + summarizer.importData(options, handler, new Configuration()); + String expr = "MAP_GET(DOMAIN_REMOVE_TLD(domain), s) > 0"; + for(String domain : domains) { + Boolean b = (Boolean)StellarProcessorUtils.run(expr, ImmutableMap.of("s", finalObj.get(), "domain", domain)); + Assert.assertTrue("Can't find " + domain, b); + } + } + + @Test + public void testWholeFile() throws Exception { + testWholeFile(5); + testWholeFile(1); + } + + public void testWholeFile(final int numThreads) throws IOException, InvalidWriterOutput { + ExtractorHandler handler = ExtractorHandler.load(stellarExtractorConfigWholeFile); + LocalSummarizer summarizer = new MockSummarizer( + new HashMap<String, String>() {{ + for(String domain : domains) { + put(domain, "1," + domain); + } + }} + ); + final AtomicReference<Object> finalObj = new AtomicReference<>(null); + EnumMap<SummarizeOptions, Optional<Object>> options = new EnumMap<SummarizeOptions, Optional<Object>>(SummarizeOptions.class) {{ + put(SummarizeOptions.INPUT, Optional.of(".")); + put(SummarizeOptions.BATCH_SIZE, Optional.of(5)); + put(SummarizeOptions.QUIET, Optional.of(true)); + put(SummarizeOptions.OUTPUT_MODE, Optional.of(new PeekingWriter(finalObj))); + put(SummarizeOptions.OUTPUT, Optional.of("out")); + put(SummarizeOptions.NUM_THREADS, Optional.of(numThreads)); + }}; + summarizer.importData(options, handler, new Configuration()); + String expr = "MAP_GET(DOMAIN_REMOVE_TLD(domain), s) > 0"; + for(String domain : domains) { + Boolean b = (Boolean)StellarProcessorUtils.run(expr, ImmutableMap.of("s", finalObj.get(), "domain", domain)); + Assert.assertTrue("Can't find " + domain, b); + } + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java index ab8ced1..1d2655d 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java @@ -18,6 +18,7 @@ package org.apache.metron.stellar.dsl.functions; +import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import com.google.common.net.InternetDomainName; @@ -229,7 +230,13 @@ public class NetworkFunctions { private static String extractTld(InternetDomainName idn, String dn) { if(idn != null && idn.hasPublicSuffix()) { - return idn.publicSuffix().toString(); + String ret = idn.publicSuffix().toString(); + if(ret.startsWith("InternetDomainName")) { + return Joiner.on(".").join(idn.publicSuffix().parts()); + } + else { + return ret; + } } else if(dn != null) { StringBuffer tld = new StringBuffer("");
