http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java new file mode 100644 index 0000000..8259ce7 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java @@ -0,0 +1,488 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.*; +import org.apache.spark.mllib.feature.IDF; +import org.apache.spark.mllib.feature.IDFModel; +import org.apache.spark.mllib.linalg.*; +import org.apache.spark.mllib.linalg.distributed.IndexedRow; +import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import scala.Tuple2; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Stream; + +/** + * Matrix utility tool + */ +public class MatrixUtil { + + private MatrixUtil() { + } + + /** + * buildSVDMatrix: Generate SVD matrix from TF-IDF matrix. Please make sure + * the TF-IDF matrix has been already built from the original documents. + * + * @param tfidfMatrix, + * each row is a term and each column is a document name and each + * cell is the TF-IDF value of the term in the corresponding + * document. + * @param dimension + * Column number of the SVD matrix + * @return RowMatrix, each row is a term and each column is a dimension in the + * feature space, each cell is value of the term in the corresponding + * dimension. + */ + public static RowMatrix buildSVDMatrix(RowMatrix tfidfMatrix, int dimension) { + int matrixCol = (int) tfidfMatrix.numCols(); + if (matrixCol < dimension) { + dimension = matrixCol; + } + + SingularValueDecomposition<RowMatrix, Matrix> svd = tfidfMatrix.computeSVD(dimension, true, 1.0E-9d); + RowMatrix u = svd.U(); + Vector s = svd.s(); + return u.multiply(Matrices.diag(s)); + } + + /** + * buildSVDMatrix: Generate SVD matrix from Vector RDD. + * + * @param vecRDD + * vectors of terms in feature space + * @param dimension + * Column number of the SVD matrix + * @return RowMatrix, each row is a term and each column is a dimension in the + * feature space, each cell is value of the term in the corresponding + * dimension. + */ + public static RowMatrix buildSVDMatrix(JavaRDD<Vector> vecRDD, int dimension) { + RowMatrix tfidfMatrix = new RowMatrix(vecRDD.rdd()); + SingularValueDecomposition<RowMatrix, Matrix> svd = tfidfMatrix.computeSVD(dimension, true, 1.0E-9d); + RowMatrix u = svd.U(); + Vector s = svd.s(); + return u.multiply(Matrices.diag(s)); + } + + /** + * Create TF-IDF matrix from word-doc matrix. + * + * @param wordDocMatrix, + * each row is a term, each column is a document name and each cell + * is number of the term in the corresponding document. + * @return RowMatrix, each row is a term and each column is a document name + * and each cell is the TF-IDF value of the term in the corresponding + * document. + */ + public static RowMatrix createTFIDFMatrix(RowMatrix wordDocMatrix) { + JavaRDD<Vector> newcountRDD = wordDocMatrix.rows().toJavaRDD(); + IDFModel idfModel = new IDF().fit(newcountRDD); + JavaRDD<Vector> idf = idfModel.transform(newcountRDD); + return new RowMatrix(idf.rdd()); + } + + /** + * Create matrix from doc-terms JavaPairRDD. + * + * @param uniqueDocRDD + * doc-terms JavaPairRDD, in which each key is a doc name, and value + * is term list extracted from that doc + * @return LabeledRowMatrix {@link LabeledRowMatrix} + */ + public static LabeledRowMatrix createWordDocMatrix(JavaPairRDD<String, List<String>> uniqueDocRDD) { + // Index documents with unique IDs + JavaPairRDD<List<String>, Long> corpus = uniqueDocRDD.values().zipWithIndex(); + // cal word-doc numbers + JavaPairRDD<Tuple2<String, Long>, Double> worddocNumRDD = corpus.flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Long>, Tuple2<String, Long>, Double>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Iterator<Tuple2<Tuple2<String, Long>, Double>> call(Tuple2<List<String>, Long> docwords) throws Exception { + List<Tuple2<Tuple2<String, Long>, Double>> pairs = new ArrayList<>(); + List<String> words = docwords._1; + int n = words.size(); + for (int i = 0; i < n; i++) { + Tuple2<String, Long> worddoc = new Tuple2<>(words.get(i), docwords._2); + pairs.add(new Tuple2<Tuple2<String, Long>, Double>(worddoc, 1.0)); + } + return pairs.iterator(); + } + }).reduceByKey(new Function2<Double, Double, Double>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Double call(Double first, Double second) throws Exception { + return first + second; + } + }); + // cal word doc-numbers + JavaPairRDD<String, Tuple2<List<Long>, List<Double>>> wordDocnumRDD = worddocNumRDD + .mapToPair(new PairFunction<Tuple2<Tuple2<String, Long>, Double>, String, Tuple2<List<Long>, List<Double>>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Tuple2<List<Long>, List<Double>>> call(Tuple2<Tuple2<String, Long>, Double> worddocNum) throws Exception { + List<Long> docs = new ArrayList<>(); + docs.add(worddocNum._1._2); + List<Double> nums = new ArrayList<>(); + nums.add(worddocNum._2); + Tuple2<List<Long>, List<Double>> docmums = new Tuple2<>(docs, nums); + return new Tuple2<>(worddocNum._1._1, docmums); + } + }); + // trans to vector + final int corporsize = (int) uniqueDocRDD.keys().count(); + JavaPairRDD<String, Vector> wordVectorRDD = wordDocnumRDD.reduceByKey(new Function2<Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<List<Long>, List<Double>> call(Tuple2<List<Long>, List<Double>> arg0, Tuple2<List<Long>, List<Double>> arg1) throws Exception { + arg0._1.addAll(arg1._1); + arg0._2.addAll(arg1._2); + return new Tuple2<>(arg0._1, arg0._2); + } + }).mapToPair(new PairFunction<Tuple2<String, Tuple2<List<Long>, List<Double>>>, String, Vector>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Vector> call(Tuple2<String, Tuple2<List<Long>, List<Double>>> arg0) throws Exception { + int docsize = arg0._2._1.size(); + int[] intArray = new int[docsize]; + double[] doubleArray = new double[docsize]; + for (int i = 0; i < docsize; i++) { + intArray[i] = arg0._2._1.get(i).intValue(); + doubleArray[i] = arg0._2._2.get(i).intValue(); + } + Vector sv = Vectors.sparse(corporsize, intArray, doubleArray); + return new Tuple2<>(arg0._1, sv); + } + }); + + RowMatrix wordDocMatrix = new RowMatrix(wordVectorRDD.values().rdd()); + + LabeledRowMatrix labeledRowMatrix = new LabeledRowMatrix(); + labeledRowMatrix.rowMatrix = wordDocMatrix; + labeledRowMatrix.rowkeys = wordVectorRDD.keys().collect(); + labeledRowMatrix.colkeys = uniqueDocRDD.keys().collect(); + return labeledRowMatrix; + } + + public static LabeledRowMatrix createDocWordMatrix(JavaPairRDD<String, List<String>> uniqueDocRDD, JavaSparkContext sc) { + // Index word with unique IDs + JavaPairRDD<String, Long> wordIDRDD = uniqueDocRDD.values().flatMap(new FlatMapFunction<List<String>, String>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Iterator<String> call(List<String> arg0) throws Exception { + return arg0.iterator(); + } + }).distinct().zipWithIndex(); + + // + JavaPairRDD<Tuple2<String, String>, Double> docwordNumRDD = uniqueDocRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, List<String>>, Tuple2<String, String>, Double>() { + + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Iterator<Tuple2<Tuple2<String, String>, Double>> call(Tuple2<String, List<String>> docwords) throws Exception { + List<Tuple2<Tuple2<String, String>, Double>> pairs = new ArrayList<>(); + List<String> words = docwords._2; + int n = words.size(); + for (int i = 0; i < n; i++) { + Tuple2<String, String> worddoc = new Tuple2<>(docwords._1, words.get(i)); + pairs.add(new Tuple2<Tuple2<String, String>, Double>(worddoc, 1.0)); + } + return pairs.iterator(); + } + }).reduceByKey(new Function2<Double, Double, Double>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Double call(Double first, Double second) throws Exception { + return first + second; + } + }); + + // + JavaPairRDD<String, Tuple2<String, Double>> wordDocnumRDD = docwordNumRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Double>, String, Tuple2<String, Double>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Tuple2<String, Double>> call(Tuple2<Tuple2<String, String>, Double> arg0) throws Exception { + + Tuple2<String, Double> wordmums = new Tuple2<>(arg0._1._1, arg0._2); + return new Tuple2<>(arg0._1._2, wordmums); + } + }); + + // + + JavaPairRDD<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> testRDD = wordDocnumRDD.leftOuterJoin(wordIDRDD); + + int wordsize = (int) wordIDRDD.count(); + JavaPairRDD<String, Vector> docVectorRDD = testRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>>, String, Tuple2<List<Long>, List<Double>>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Tuple2<List<Long>, List<Double>>> call(Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> arg0) throws Exception { + Optional<Long> oid = arg0._2._2; + Long wordId = (long) 0; + if (oid.isPresent()) { + wordId = oid.get(); + } + + List<Long> word = new ArrayList<>(); + word.add(wordId); + + List<Double> count = new ArrayList<>(); + count.add(arg0._2._1._2); + + Tuple2<List<Long>, List<Double>> wordcount = new Tuple2<>(word, count); + + return new Tuple2<>(arg0._2._1._1, wordcount); + } + + }).reduceByKey(new Function2<Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<List<Long>, List<Double>> call(Tuple2<List<Long>, List<Double>> arg0, Tuple2<List<Long>, List<Double>> arg1) throws Exception { + arg0._1.addAll(arg1._1); + arg0._2.addAll(arg1._2); + return new Tuple2<>(arg0._1, arg0._2); + } + }).mapToPair(new PairFunction<Tuple2<String, Tuple2<List<Long>, List<Double>>>, String, Vector>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Vector> call(Tuple2<String, Tuple2<List<Long>, List<Double>>> arg0) throws Exception { + int docsize = arg0._2._1.size(); + int[] intArray = new int[docsize]; + double[] doubleArray = new double[docsize]; + for (int i = 0; i < docsize; i++) { + intArray[i] = arg0._2._1.get(i).intValue(); + doubleArray[i] = arg0._2._2.get(i).intValue(); + } + Vector sv = Vectors.sparse(wordsize, intArray, doubleArray); + return new Tuple2<>(arg0._1, sv); + } + }); + + RowMatrix docwordMatrix = new RowMatrix(docVectorRDD.values().rdd()); + + LabeledRowMatrix labeledRowMatrix = new LabeledRowMatrix(); + labeledRowMatrix.rowMatrix = docwordMatrix; + labeledRowMatrix.rowkeys = docVectorRDD.keys().collect(); + labeledRowMatrix.colkeys = wordIDRDD.keys().collect(); + + return labeledRowMatrix; + } + + /** + * loadVectorFromCSV: Load term vector from csv file. + * + * @param spark + * spark instance + * @param csvFileName + * csv matrix file + * @param skipNum + * the numbers of rows which should be skipped Ignore the top skip + * number rows of the csv file + * @return JavaPairRDD, each key is a term, and value is the vector of the + * term in feature space. + */ + public static JavaPairRDD<String, Vector> loadVectorFromCSV(SparkDriver spark, String csvFileName, int skipNum) { + // skip the first line (header), important! + JavaRDD<String> importRDD = spark.sc.textFile(csvFileName); + JavaPairRDD<String, Long> importIdRDD = importRDD.zipWithIndex().filter(new Function<Tuple2<String, Long>, Boolean>() { + /** */ + private static final long serialVersionUID = 1L; + + @Override + public Boolean call(Tuple2<String, Long> v1) throws Exception { + if (v1._2 > (skipNum - 1)) { + return true; + } + return false; + } + }); + + if (importIdRDD.count() == 0) { + return null; + } + + return importIdRDD.mapToPair(new PairFunction<Tuple2<String, Long>, String, Vector>() { + /** */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Vector> call(Tuple2<String, Long> t) throws Exception { + String[] fields = t._1.split(","); + String word = fields[0]; + int fieldsize = fields.length; + int nStart = 1; + int nEnd = fieldsize - 1; + if (fieldsize < 2) { + nStart = 0; + nEnd = 0; + } + String[] numfields = Arrays.copyOfRange(fields, nStart, nEnd); + + double[] nums = Stream.of(numfields).mapToDouble(Double::parseDouble).toArray(); + Vector vec = Vectors.dense(nums); + return new Tuple2<>(word, vec); + } + }); + } + + /** + * Convert vectorRDD to indexed row matrix. + * + * @param vecs + * Vector RDD + * @return IndexedRowMatrix + */ + public static IndexedRowMatrix buildIndexRowMatrix(JavaRDD<Vector> vecs) { + JavaRDD<IndexedRow> indexrows = vecs.zipWithIndex().map(new Function<Tuple2<Vector, Long>, IndexedRow>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public IndexedRow call(Tuple2<Vector, Long> docId) { + return new IndexedRow(docId._2, docId._1); + } + }); + return new IndexedRowMatrix(indexrows.rdd()); + } + + /** + * Transpose matrix + * + * @param indexedMatrix + * spark indexed matrix + * @return rowmatrix, each row is corresponding to the column in the original + * matrix and vice versa + */ + public static RowMatrix transposeMatrix(IndexedRowMatrix indexedMatrix) { + return indexedMatrix.toCoordinateMatrix().transpose().toRowMatrix(); + } + + /** + * Output matrix to a CSV file. + * + * @param matrix + * spark row matrix + * @param rowKeys + * matrix row names + * @param colKeys + * matrix coloum names + * @param fileName + * csv file name + */ + public static void exportToCSV(RowMatrix matrix, List<String> rowKeys, List<String> colKeys, String fileName) { + + if (matrix.rows().isEmpty()) { + return; + } + + int rownum = (int) matrix.numRows(); + int colnum = (int) matrix.numCols(); + List<Vector> rows = matrix.rows().toJavaRDD().collect(); + + File file = new File(fileName); + if (file.exists()) { + file.delete(); + } + try { + file.createNewFile(); + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + String coltitle = " Num" + ","; + for (int j = 0; j < colnum; j++) { + coltitle += "\"" + colKeys.get(j) + "\","; + } + coltitle = coltitle.substring(0, coltitle.length() - 1); + bw.write(coltitle + "\n"); + + for (int i = 0; i < rownum; i++) { + double[] rowvlaue = rows.get(i).toArray(); + String row = rowKeys.get(i) + ","; + for (int j = 0; j < colnum; j++) { + row += rowvlaue[j] + ","; + } + row = row.substring(0, row.length() - 1); + bw.write(row + "\n"); + } + + bw.close(); + + } catch (IOException e) { + e.printStackTrace(); + + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/RDDUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/RDDUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/RDDUtil.java new file mode 100644 index 0000000..8c2e64c --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/RDDUtil.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; + +import java.util.Iterator; +import java.util.List; + +/** + * ClassName: RDDUtil Function: Mudrod Spark RDD common methods + */ +public class RDDUtil { + + public RDDUtil() { + } + + /** + * getAllWordsInDoc: Extracted all unique terms from all docs. + * + * @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from + * that doc. + * @return unique term list + */ + public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) { + JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Iterator<String> call(List<String> list) { + return list.iterator(); + } + }).distinct(); + + return wordRDD; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/SVDUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/SVDUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/SVDUtil.java new file mode 100644 index 0000000..1cd4e00 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/SVDUtil.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * Singular value decomposition + */ +public class SVDUtil extends MudrodAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + // wordRDD: terms extracted from all documents + JavaRDD<String> wordRDD; + // svdMatrix: svd matrix + private RowMatrix svdMatrix; + // simMatrix: similarity matrix + private CoordinateMatrix simMatrix; + + /** + * Creates a new instance of SVDUtil. + * + * @param config the Mudrod configuration + * @param es the Elasticsearch drive + * @param spark the spark driver + */ + public SVDUtil(Properties config, ESDriver es, SparkDriver spark) { + super(config, es, spark); + } + + /** + * Build SVD matrix from docment-terms pairs. + * + * @param docwordRDD JavaPairRDD, key is short name of data set and values are terms in + * the corresponding data set + * @param svdDimension: Dimension of matrix after singular value decomposition + * @return row matrix + */ + public RowMatrix buildSVDMatrix(JavaPairRDD<String, List<String>> docwordRDD, int svdDimension) { + + RowMatrix svdMatrix = null; + LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(docwordRDD); + RowMatrix ifIdfMatrix = MatrixUtil.createTFIDFMatrix(wordDocMatrix.rowMatrix); + svdMatrix = MatrixUtil.buildSVDMatrix(ifIdfMatrix, svdDimension); + this.svdMatrix = svdMatrix; + this.wordRDD = RDDUtil.getAllWordsInDoc(docwordRDD); + return svdMatrix; + } + + /** + * Build svd matrix from CSV file. + * + * @param tfidfCSVfile tf-idf matrix csv file + * @param svdDimension: Dimension of matrix after singular value decomposition + * @return row matrix + */ + public RowMatrix buildSVDMatrix(String tfidfCSVfile, int svdDimension) { + RowMatrix svdMatrix = null; + JavaPairRDD<String, Vector> tfidfRDD = MatrixUtil.loadVectorFromCSV(spark, tfidfCSVfile, 2); + JavaRDD<Vector> vectorRDD = tfidfRDD.values(); + + svdMatrix = MatrixUtil.buildSVDMatrix(vectorRDD, svdDimension); + this.svdMatrix = svdMatrix; + + this.wordRDD = tfidfRDD.keys(); + + return svdMatrix; + } + + /** + * Calculate similarity + */ + public void calSimilarity() { + CoordinateMatrix simMatrix = SimilarityUtil.calculateSimilarityFromMatrix(svdMatrix); + this.simMatrix = simMatrix; + } + + /** + * Insert linkage triples to elasticsearch + * + * @param index index name + * @param type linkage triple name + */ + public void insertLinkageToES(String index, String type) { + List<LinkageTriple> triples = SimilarityUtil.matrixToTriples(wordRDD, simMatrix); + try { + LinkageTriple.insertTriples(es, triples, index, type); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java new file mode 100644 index 0000000..6fdc66d --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java @@ -0,0 +1,277 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix; +import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix; +import org.apache.spark.mllib.linalg.distributed.MatrixEntry; +import org.apache.spark.mllib.linalg.distributed.RowMatrix; +import scala.Tuple2; + +import java.util.List; + +/** + * Similarity and distrance calculation utilities + */ +public class SimilarityUtil { + + public static final int SIM_COSINE = 3; + public static final int SIM_HELLINGER = 2; + public static final int SIM_PEARSON = 1; + /** + * CalSimilarityFromMatrix: Calculate term similarity from matrix. + * + * @param svdMatrix. Each row is corresponding to a term, and each column is + * corresponding to a dimension of feature + * @return CoordinateMatrix, each row is corresponding to a term, and each + * column is also a term, the cell value is the similarity between the + * two terms + */ + public static CoordinateMatrix calculateSimilarityFromMatrix(RowMatrix svdMatrix) { + JavaRDD<Vector> vecs = svdMatrix.rows().toJavaRDD(); + return SimilarityUtil.calculateSimilarityFromVector(vecs); + } + + /** + * CalSimilarityFromVector:Calculate term similarity from vector. + * + * @param vecs Each vector is corresponding to a term in the feature space. + * @return CoordinateMatrix, each row is corresponding to a term, and each + * column is also a term, the cell value is the similarity between the + * two terms + */ + public static CoordinateMatrix calculateSimilarityFromVector(JavaRDD<Vector> vecs) { + IndexedRowMatrix indexedMatrix = MatrixUtil.buildIndexRowMatrix(vecs); + RowMatrix transposeMatrix = MatrixUtil.transposeMatrix(indexedMatrix); + return transposeMatrix.columnSimilarities(); + } + + /** + * Calculate term similarity from vector. + * + * @param importRDD the {@link org.apache.spark.api.java.JavaPairRDD} + * data structure containing the vectors. + * @param simType the similarity calculation to execute e.g. + * <ul> + * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_COSINE} - 3,</li> + * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_HELLINGER} - 2,</li> + * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_PEARSON} - 1</li> + * </ul> + * @return a new {@link org.apache.spark.api.java.JavaPairRDD} + */ + public static JavaRDD<LinkageTriple> calculateSimilarityFromVector(JavaPairRDD<String, Vector> importRDD, int simType) { + JavaRDD<Tuple2<String, Vector>> importRDD1 = importRDD.map(f -> new Tuple2<String, Vector>(f._1, f._2)); + JavaPairRDD<Tuple2<String, Vector>, Tuple2<String, Vector>> cartesianRDD = importRDD1.cartesian(importRDD1); + + return cartesianRDD.map(new Function<Tuple2<Tuple2<String, Vector>, Tuple2<String, Vector>>, LinkageTriple>() { + + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public LinkageTriple call(Tuple2<Tuple2<String, Vector>, Tuple2<String, Vector>> arg) { + String keyA = arg._1._1; + String keyB = arg._2._1; + + if (keyA.equals(keyB)) { + return null; + } + + Vector vecA = arg._1._2; + Vector vecB = arg._2._2; + Double weight = 0.0; + + if (simType == SimilarityUtil.SIM_PEARSON) { + weight = SimilarityUtil.pearsonDistance(vecA, vecB); + } else if (simType == SimilarityUtil.SIM_HELLINGER) { + weight = SimilarityUtil.hellingerDistance(vecA, vecB); + } + + LinkageTriple triple = new LinkageTriple(); + triple.keyA = keyA; + triple.keyB = keyB; + triple.weight = weight; + return triple; + } + }).filter(new Function<LinkageTriple, Boolean>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Boolean call(LinkageTriple arg0) throws Exception { + if (arg0 == null) { + return false; + } + return true; + } + }); + } + + /** + * MatrixtoTriples:Convert term similarity matrix to linkage triple list. + * + * @param keys each key is a term + * @param simMatirx term similarity matrix, in which each row and column is a term and + * the cell value is the similarity between the two terms + * @return linkage triple list + */ + public static List<LinkageTriple> matrixToTriples(JavaRDD<String> keys, CoordinateMatrix simMatirx) { + if (simMatirx.numCols() != keys.count()) { + return null; + } + + // index words + JavaPairRDD<Long, String> keyIdRDD = JavaPairRDD.fromJavaRDD(keys.zipWithIndex().map(new Function<Tuple2<String, Long>, Tuple2<Long, String>>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Long, String> call(Tuple2<String, Long> docId) { + return docId.swap(); + } + })); + + JavaPairRDD<Long, LinkageTriple> entriesRowRDD = simMatirx.entries().toJavaRDD().mapToPair(new PairFunction<MatrixEntry, Long, LinkageTriple>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Long, LinkageTriple> call(MatrixEntry t) throws Exception { + LinkageTriple triple = new LinkageTriple(); + triple.keyAId = t.i(); + triple.keyBId = t.j(); + triple.weight = t.value(); + return new Tuple2<>(triple.keyAId, triple); + } + }); + + JavaPairRDD<Long, LinkageTriple> entriesColRDD = entriesRowRDD.leftOuterJoin(keyIdRDD).values().mapToPair(new PairFunction<Tuple2<LinkageTriple, Optional<String>>, Long, LinkageTriple>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Long, LinkageTriple> call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception { + LinkageTriple triple = t._1; + Optional<String> stra = t._2; + if (stra.isPresent()) { + triple.keyA = stra.get(); + } + return new Tuple2<>(triple.keyBId, triple); + } + }); + + JavaRDD<LinkageTriple> tripleRDD = entriesColRDD.leftOuterJoin(keyIdRDD).values().map(new Function<Tuple2<LinkageTriple, Optional<String>>, LinkageTriple>() { + /** + * + */ + private static final long serialVersionUID = 1L; + + @Override + public LinkageTriple call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception { + LinkageTriple triple = t._1; + Optional<String> strb = t._2; + if (strb.isPresent()) { + triple.keyB = strb.get(); + } + return triple; + } + }); + return tripleRDD.collect(); + } + + /** + * Calculate similarity (Hellinger Distance) between vectors + * + * @param vecA initial vector from which to calculate a similarity + * @param vecB second vector involved in similarity calculation + * @return similarity between two vectors + */ + public static double hellingerDistance(Vector vecA, Vector vecB) { + double[] arrA = vecA.toArray(); + double[] arrB = vecB.toArray(); + + double sim = 0.0; + + int arrsize = arrA.length; + for (int i = 0; i < arrsize; i++) { + double a = arrA[i]; + double b = arrB[i]; + double sqrtDiff = Math.sqrt(a) - Math.sqrt(b); + sim += sqrtDiff * sqrtDiff; + } + + sim = sim / Math.sqrt(2); + + return sim; + } + + /** + * Calculate similarity (Pearson Distance) between vectors + * + * @param vecA initial vector from which to calculate a similarity + * @param vecB second vector involved in similarity calculation + * @return similarity between two vectors + */ + public static double pearsonDistance(Vector vecA, Vector vecB) { + double[] arrA = vecA.toArray(); + double[] arrB = vecB.toArray(); + + int viewA = 0; + int viewB = 0; + int viewAB = 0; + + int arrsize = arrA.length; + for (int i = 0; i < arrsize; i++) { + if (arrA[i] > 0) { + viewA++; + } + + if (arrB[i] > 0) { + viewB++; + } + + if (arrB[i] > 0 && arrA[i] > 0) { + viewAB++; + } + } + return viewAB / (Math.sqrt(viewA) * Math.sqrt(viewB)); + } + + /** + * calculate similarity between vectors + * + * @param vecA initial vector from which to calculate a similarity + * @param vecB second vector involved in similarity calculation + * @return similarity between two vectors + */ + public static double cosineDistance(Vector vecA, Vector vecB) { + return 1; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/utils/package-info.java new file mode 100644 index 0000000..1adb0b9 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes utilities classes for calculating similarity and + * parsing HTTP request + */ +package org.apache.sdap.mudrod.utils; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/package-info.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/package-info.java new file mode 100644 index 0000000..9c87a7d --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/package-info.java @@ -0,0 +1,18 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package includes web log pre-processing, processing, and data structure + * classes. + */ +package org.apache.sdap.mudrod.weblog; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/KGreedyPartitionSolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/KGreedyPartitionSolver.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/KGreedyPartitionSolver.java new file mode 100644 index 0000000..8f4e263 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/KGreedyPartitionSolver.java @@ -0,0 +1,142 @@ +package org.apache.sdap.mudrod.weblog.partition; + +import java.util.*; + +public class KGreedyPartitionSolver implements ThePartitionProblemSolver { + + public boolean bsorted = false; + + public KGreedyPartitionSolver() { + // default constructor + } + + public KGreedyPartitionSolver(boolean bsorted) { + this.bsorted = true; + } + + @Override + public Map<String, Integer> solve(Map<String, Double> labelNums, int k) { + List<Double> lista = null; + List<String> months = null; + + if (!this.bsorted) { + LinkedHashMap sortedMap = this.sortMapByValue(labelNums); + lista = new ArrayList(sortedMap.values()); + months = new ArrayList(sortedMap.keySet()); + } else { + lista = new ArrayList(labelNums.values()); + months = new ArrayList(labelNums.keySet()); + } + + List<List<Double>> parts = new ArrayList<>(); + List<List<String>> splitMonths = new ArrayList<>(); + + for (int i = 0; i < k; i++) { + List<Double> part = new ArrayList(); + parts.add(part); + + List<String> monthList = new ArrayList(); + splitMonths.add(monthList); + } + + int j = 0; + for (Double lista1 : lista) { + + Double minimalSum = -1.0; + int position = 0; + for (int i = 0; i < parts.size(); i++) { + List<Double> part = parts.get(i); + if (minimalSum == -1) { + minimalSum = Suma(part); + position = i; + } else if (Suma(part) < minimalSum) { + minimalSum = Suma(part); + position = i; + } + } + + List<Double> part = parts.get(position); + part.add(lista1); + parts.set(position, part); + + List<String> month = splitMonths.get(position); + month.add(months.get(j)); + splitMonths.set(position, month); + j++; + } + + /* for(int i=0; i<splitMonths.size(); i++){ + System.out.println("group:" + i); + printStrList(splitMonths.get(i)); + } + + for(int i=0; i<parts.size(); i++){ + print(parts.get(i)); + }*/ + + Map<String, Integer> LabelGroups = new HashMap<String, Integer>(); + for (int i = 0; i < splitMonths.size(); i++) { + List<String> list = splitMonths.get(i); + for (int m = 0; m < list.size(); m++) { + LabelGroups.put(list.get(m), i); + } + } + + return LabelGroups; + } + + public LinkedHashMap<String, Double> sortMapByValue(Map passedMap) { + List mapKeys = new ArrayList(passedMap.keySet()); + List mapValues = new ArrayList(passedMap.values()); + Collections.sort(mapValues, Collections.reverseOrder()); + Collections.sort(mapKeys, Collections.reverseOrder()); + + LinkedHashMap sortedMap = new LinkedHashMap(); + + Iterator valueIt = mapValues.iterator(); + while (valueIt.hasNext()) { + Object val = valueIt.next(); + Iterator keyIt = mapKeys.iterator(); + + while (keyIt.hasNext()) { + Object key = keyIt.next(); + String comp1 = passedMap.get(key).toString(); + String comp2 = val.toString(); + + if (comp1.equals(comp2)) { + passedMap.remove(key); + mapKeys.remove(key); + sortedMap.put((String) key, (Double) val); + break; + } + + } + + } + return sortedMap; + } + + private Double Suma(List<Double> part) { + Double ret = 0.0; + for (int i = 0; i < part.size(); i++) { + ret += part.get(i); + } + return ret; + } + + private void print(List<Double> list) { + /*for (int i = 0; i < list.size(); i++) { + System.out.print(list.get(i)+","); + }*/ + System.out.print("sum is:" + Suma(list)); + System.out.println(); + } + + private void printStrList(List<String> list) { + for (int i = 0; i < list.size(); i++) { + System.out.print(list.get(i) + ","); + } + System.out.println(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/ThePartitionProblemSolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/ThePartitionProblemSolver.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/ThePartitionProblemSolver.java new file mode 100644 index 0000000..507140f --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/ThePartitionProblemSolver.java @@ -0,0 +1,8 @@ +package org.apache.sdap.mudrod.weblog.partition; + +import java.util.Map; + +public interface ThePartitionProblemSolver { + + public Map<String, Integer> solve(Map<String, Double> labelNums, int k); +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/logPartitioner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/logPartitioner.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/logPartitioner.java new file mode 100644 index 0000000..7ff2181 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/logPartitioner.java @@ -0,0 +1,33 @@ +package org.apache.sdap.mudrod.weblog.partition; + +import org.apache.spark.Partitioner; + +import java.util.Map; + +public class logPartitioner extends Partitioner { + + int num; + Map<String, Integer> UserGroups; + + public logPartitioner(int num) { + this.num = num; + } + + public logPartitioner(Map<String, Integer> UserGroups, int num) { + this.UserGroups = UserGroups; + this.num = num; + } + + @Override + public int getPartition(Object arg0) { + // TODO Auto-generated method stub + String user = (String) arg0; + return UserGroups.get(user); + } + + @Override + public int numPartitions() { + // TODO Auto-generated method stub + return num; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java new file mode 100644 index 0000000..e678854 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java @@ -0,0 +1,74 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.utils.LabeledRowMatrix; +import org.apache.sdap.mudrod.utils.MatrixUtil; +import org.apache.sdap.mudrod.weblog.structure.ClickStream; +import org.apache.sdap.mudrod.weblog.structure.SessionExtractor; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Properties; + +/** + * Supports ability to extract click stream data based on session processing results + */ +public class ClickStreamGenerator extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(ClickStreamGenerator.class); + + public ClickStreamGenerator(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting ClickStreamGenerator..."); + startTime = System.currentTimeMillis(); + + String clickstremMatrixFile = props.getProperty("clickstreamMatrix"); + try { + SessionExtractor extractor = new SessionExtractor(); + JavaRDD<ClickStream> clickstreamRDD = extractor.extractClickStreamFromES(this.props, this.es, this.spark); + int weight = Integer.parseInt(props.getProperty("downloadWeight")); + JavaPairRDD<String, List<String>> metaddataQueryRDD = extractor.bulidDataQueryRDD(clickstreamRDD, weight); + LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metaddataQueryRDD); + + MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, clickstremMatrixFile); + } catch (Exception e) { + LOG.error("Encountered error within ClickStreamGenerator: {}", e); + } + + endTime = System.currentTimeMillis(); + LOG.info("ClickStreamGenerator complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java new file mode 100644 index 0000000..79a014e --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java @@ -0,0 +1,252 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.joda.time.DateTime; +import org.joda.time.Seconds; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * An {@link DiscoveryStepAbstract} + * implementation which detects a known list of Web crawlers which may may be + * present within, and pollute various logs acting as input to Mudrod. + */ +public class CrawlerDetection extends LogAbstract { + /** + * + */ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(CrawlerDetection.class); + + public static final String CRAWLER = "crawler"; + public static final String GOOGLE_BOT = "googlebot"; + public static final String BING_BOT = "bingbot"; + public static final String YAHOO_BOT = "slurp"; + public static final String YACY_BOT = "yacybot"; + public static final String ROGER_BOT = "rogerbot"; + public static final String YANDEX_BOT = "yandexbot"; + + public static final String NO_AGENT_BOT = "-"; + public static final String PERL_BOT = "libwww-perl/"; + public static final String APACHE_HHTP = "apache-httpclient/"; + public static final String JAVA_CLIENT = "java/"; + public static final String CURL = "curl/"; + + /** + * Paramterized constructor to instantiate a configured instance of + * {@link CrawlerDetection} + * + * @param props populated {@link java.util.Properties} object + * @param es {@link ESDriver} object to use in + * crawler detection preprocessing. + * @param spark {@link SparkDriver} object to use in + * crawler detection preprocessing. + */ + public CrawlerDetection(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + public CrawlerDetection() { + super(null, null, null); + } + + @Override + public Object execute() { + LOG.info("Starting Crawler detection {}.", httpType); + startTime = System.currentTimeMillis(); + try { + checkByRate(); + } catch (InterruptedException | IOException e) { + LOG.error("Encountered an error whilst detecting Web crawlers.", e); + } + endTime = System.currentTimeMillis(); + es.refreshIndex(); + LOG.info("Crawler detection complete. Time elapsed {} seconds", (endTime - startTime) / 1000); + return null; + } + + /** + * Check known crawler through crawler agent name list + * + * @param agent name of a log line + * @return 1 if the log is initiated by crawler, 0 otherwise + */ + public boolean checkKnownCrawler(String agent) { + agent = agent.toLowerCase(); + if (agent.contains(CRAWLER) || agent.contains(GOOGLE_BOT) || agent.contains(BING_BOT) || agent.contains(APACHE_HHTP) || agent.contains(PERL_BOT) || agent.contains(YAHOO_BOT) || agent + .contains(YANDEX_BOT) || agent.contains(NO_AGENT_BOT) || agent.contains(PERL_BOT) || agent.contains(APACHE_HHTP) || agent.contains(JAVA_CLIENT) || agent.contains(CURL)) { + return true; + } else { + return false; + } + } + + public void checkByRate() throws InterruptedException, IOException { + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE); + if (processingType.equals("sequential")) { + checkByRateInSequential(); + } else if (processingType.equals("parallel")) { + checkByRateInParallel(); + } + } + + /** + * Check crawler by request sending rate, which is read from configruation + * file + * + * @throws InterruptedException InterruptedException + * @throws IOException IOException + */ + public void checkByRateInSequential() throws InterruptedException, IOException { + es.createBulkProcessor(); + + int rate = Integer.parseInt(props.getProperty("sendingrate")); + + Terms users = this.getUserTerms(this.httpType); + LOG.info("Original User count: {}", Integer.toString(users.getBuckets().size())); + + int userCount = 0; + for (Terms.Bucket entry : users.getBuckets()) { + String user = entry.getKey().toString(); + int count = checkByRate(es, user); + userCount += count; + } + es.destroyBulkProcessor(); + LOG.info("User count: {}", Integer.toString(userCount)); + } + + void checkByRateInParallel() throws InterruptedException, IOException { + + JavaRDD<String> userRDD = getUserRDD(this.httpType); + LOG.info("Original User count: {}", userRDD.count()); + + int userCount = 0; + userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> { + ESDriver tmpES = new ESDriver(props); + tmpES.createBulkProcessor(); + List<Integer> realUserNums = new ArrayList<>(); + while (iterator.hasNext()) { + String s = iterator.next(); + Integer realUser = checkByRate(tmpES, s); + realUserNums.add(realUser); + } + tmpES.destroyBulkProcessor(); + tmpES.close(); + return realUserNums.iterator(); + }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b); + + LOG.info("User count: {}", Integer.toString(userCount)); + } + + private int checkByRate(ESDriver es, String user) { + + int rate = Integer.parseInt(props.getProperty("sendingrate")); + Pattern pattern = Pattern.compile("get (.*?) http/*"); + Matcher matcher; + + BoolQueryBuilder filterSearch = new BoolQueryBuilder(); + filterSearch.must(QueryBuilders.termQuery("IP", user)); + + AggregationBuilder aggregation = AggregationBuilders.dateHistogram("by_minute").field("Time").dateHistogramInterval(DateHistogramInterval.MINUTE).order(Order.COUNT_DESC); + SearchResponse checkRobot = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setQuery(filterSearch).setSize(0).addAggregation(aggregation).execute().actionGet(); + + Histogram agg = checkRobot.getAggregations().get("by_minute"); + + List<? extends Histogram.Bucket> botList = agg.getBuckets(); + long maxCount = botList.get(0).getDocCount(); + if (maxCount >= rate) { + return 0; + } else { + DateTime dt1 = null; + int toLast = 0; + SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setScroll(new TimeValue(60000)).setQuery(filterSearch).setSize(100).execute().actionGet(); + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + Map<String, Object> result = hit.getSource(); + String logtype = (String) result.get("LogType"); + if (logtype.equals("PO.DAAC")) { + String request = (String) result.get("Request"); + matcher = pattern.matcher(request.trim().toLowerCase()); + boolean find = false; + while (matcher.find()) { + request = matcher.group(1); + result.put("RequestUrl", "http://podaac.jpl.nasa.gov" + request); + find = true; + } + if (!find) { + result.put("RequestUrl", request); + } + } else { + result.put("RequestUrl", result.get("Request")); + } + + DateTimeFormatter fmt = ISODateTimeFormat.dateTime(); + DateTime dt2 = fmt.parseDateTime((String) result.get("Time")); + + if (dt1 == null) { + toLast = 0; + } else { + toLast = Math.abs(Seconds.secondsBetween(dt1, dt2).getSeconds()); + } + result.put("ToLast", toLast); + IndexRequest ir = new IndexRequest(logIndex, cleanupType).source(result); + + es.getBulkProcessor().add(ir); + dt1 = dt2; + } + + scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + } + + } + + return 1; + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java new file mode 100644 index 0000000..f92d79c --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java @@ -0,0 +1,139 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * Supports ability to generate search history (queries) for each individual + * user (IP) + */ +public class HistoryGenerator extends LogAbstract { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(HistoryGenerator.class); + + public HistoryGenerator(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting HistoryGenerator..."); + startTime = System.currentTimeMillis(); + + generateBinaryMatrix(); + + endTime = System.currentTimeMillis(); + LOG.info("HistoryGenerator complete. Time elapsed {} seconds", (endTime - startTime) / 1000); + return null; + } + + /** + * Method to generate a binary user*query matrix (stored in temporary .csv + * file) + */ + public void generateBinaryMatrix() { + try { + + System.out.println(props.getProperty("userHistoryMatrix")); + File file = new File(props.getProperty("userHistoryMatrix")); + if (file.exists()) { + file.delete(); + } + + file.createNewFile(); + + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + bw.write("Num" + ","); + + // step 1: write first row of csv + List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX)); + + String[] logIndices = logIndexList.toArray(new String[0]); + String[] statictypeArray = new String[] { this.sessionStats }; + int docCount = es.getDocCount(logIndices, statictypeArray); + + SearchResponse sr = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0) + .addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet(); + Terms ips = sr.getAggregations().get("IPs"); + List<String> ipList = new ArrayList<>(); + for (Terms.Bucket entry : ips.getBuckets()) { + if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) { // filter + // out + // less + // active users/ips + ipList.add(entry.getKey().toString()); + } + } + bw.write(String.join(",", ipList) + "\n"); + + // step 2: step the rest rows of csv + SearchRequestBuilder sr2Builder = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0) + .addAggregation(AggregationBuilders.terms("KeywordAgg").field("keywords").size(docCount).subAggregation(AggregationBuilders.terms("IPAgg").field("IP").size(docCount))); + + SearchResponse sr2 = sr2Builder.execute().actionGet(); + Terms keywords = sr2.getAggregations().get("KeywordAgg"); + + for (Terms.Bucket keyword : keywords.getBuckets()) { + + Map<String, Integer> ipMap = new HashMap<>(); + Terms ipAgg = keyword.getAggregations().get("IPAgg"); + + int distinctUser = ipAgg.getBuckets().size(); + if (distinctUser > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) { + bw.write(keyword.getKey() + ","); + for (Terms.Bucket IP : ipAgg.getBuckets()) { + + ipMap.put(IP.getKey().toString(), 1); + } + for (int i = 0; i < ipList.size(); i++) { + if (ipMap.containsKey(ipList.get(i))) { + bw.write(ipMap.get(ipList.get(i)) + ","); + } else { + bw.write("0,"); + } + } + bw.write("\n"); + } + } + + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + @Override + public Object execute(Object o) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java new file mode 100644 index 0000000..ca47f01 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java @@ -0,0 +1,343 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.weblog.structure.ApacheAccessLog; +import org.apache.sdap.mudrod.weblog.structure.FtpLog; +import org.apache.spark.api.java.JavaRDD; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Supports ability to parse and process FTP and HTTP log files + */ +public class ImportLogFile extends LogAbstract { + + private static final Logger LOG = LoggerFactory.getLogger(ImportLogFile.class); + + /** + * + */ + private static final long serialVersionUID = 1L; + + String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] " + "\"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" \"([^\"]+)\""; + + public static final int NUM_FIELDS = 9; + Pattern p = Pattern.compile(logEntryPattern); + transient Matcher matcher; + + /** + * Constructor supporting a number of parameters documented below. + * + * @param props a {@link java.util.Map} containing K,V of type String, String + * respectively. + * @param es the {@link ESDriver} used to persist log + * files. + * @param spark the {@link SparkDriver} used to process + * input log files. + */ + public ImportLogFile(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + } + + @Override + public Object execute() { + LOG.info("Starting Log Import {}", props.getProperty(MudrodConstants.TIME_SUFFIX)); + startTime = System.currentTimeMillis(); + readFile(); + endTime = System.currentTimeMillis(); + LOG.info("Log Import complete. Time elapsed {} seconds", (endTime - startTime) / 1000); + es.refreshIndex(); + return null; + } + + /** + * Utility function to aid String to Number formatting such that three letter + * months such as 'Jan' are converted to the Gregorian integer equivalent. + * + * @param time the input {@link java.lang.String} to convert to int. + * @return the converted Month as an int. + */ + public String switchtoNum(String time) { + String newTime = time; + if (newTime.contains("Jan")) { + newTime = newTime.replace("Jan", "1"); + } else if (newTime.contains("Feb")) { + newTime = newTime.replace("Feb", "2"); + } else if (newTime.contains("Mar")) { + newTime = newTime.replace("Mar", "3"); + } else if (newTime.contains("Apr")) { + newTime = newTime.replace("Apr", "4"); + } else if (newTime.contains("May")) { + newTime = newTime.replace("May", "5"); + } else if (newTime.contains("Jun")) { + newTime = newTime.replace("Jun", "6"); + } else if (newTime.contains("Jul")) { + newTime = newTime.replace("Jul", "7"); + } else if (newTime.contains("Aug")) { + newTime = newTime.replace("Aug", "8"); + } else if (newTime.contains("Sep")) { + newTime = newTime.replace("Sep", "9"); + } else if (newTime.contains("Oct")) { + newTime = newTime.replace("Oct", "10"); + } else if (newTime.contains("Nov")) { + newTime = newTime.replace("Nov", "11"); + } else if (newTime.contains("Dec")) { + newTime = newTime.replace("Dec", "12"); + } + return newTime; + } + + public void readFile() { + + String httplogpath = null; + String ftplogpath = null; + + File directory = new File(props.getProperty(MudrodConstants.DATA_DIR)); + File[] fList = directory.listFiles(); + for (File file : fList) { + if (file.isFile() && file.getName().contains(props.getProperty(MudrodConstants.TIME_SUFFIX))) + { + if (file.getName().contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) + { + httplogpath = file.getAbsolutePath(); + } + + if (file.getName().contains(props.getProperty(MudrodConstants.FTP_PREFIX))) + { + ftplogpath = file.getAbsolutePath(); + } + } + } + + if(httplogpath == null || ftplogpath == null) + { + LOG.error("WWW file or FTP logs cannot be found, please check your data directory."); + return; + } + + String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE, "parallel"); + if (processingType.equals("sequential")) { + readFileInSequential(httplogpath, ftplogpath); + } else if (processingType.equals("parallel")) { + readFileInParallel(httplogpath, ftplogpath); + } + } + + /** + * Read the FTP or HTTP log path with the intention of processing lines from + * log files. + * + * @param httplogpath path to the parent directory containing http logs + * @param ftplogpath path to the parent directory containing ftp logs + */ + public void readFileInSequential(String httplogpath, String ftplogpath) { + es.createBulkProcessor(); + try { + readLogFile(httplogpath, "http", logIndex, httpType); + readLogFile(ftplogpath, "FTP", logIndex, ftpType); + + } catch (IOException e) { + LOG.error("Error whilst reading log file.", e); + } + es.destroyBulkProcessor(); + } + + /** + * Read the FTP or HTTP log path with the intention of processing lines from + * log files. + * + * @param httplogpath path to the parent directory containing http logs + * @param ftplogpath path to the parent directory containing ftp logs + */ + public void readFileInParallel(String httplogpath, String ftplogpath) { + + importHttpfile(httplogpath); + importFtpfile(ftplogpath); + } + + public void importHttpfile(String httplogpath) { + // import http logs + JavaRDD<String> accessLogs = spark.sc.textFile(httplogpath, this.partition).map(s -> ApacheAccessLog.parseFromLogLine(s)).filter(ApacheAccessLog::checknull); + + JavaEsSpark.saveJsonToEs(accessLogs, logIndex + "/" + this.httpType); + } + + public void importFtpfile(String ftplogpath) { + // import ftp logs + JavaRDD<String> ftpLogs = spark.sc.textFile(ftplogpath, this.partition).map(s -> FtpLog.parseFromLogLine(s)).filter(FtpLog::checknull); + + JavaEsSpark.saveJsonToEs(ftpLogs, logIndex + "/" + this.ftpType); + } + + /** + * Process a log path on local file system which contains the relevant + * parameters as below. + * + * @param fileName the {@link java.lang.String} path to the log directory on file + * system + * @param protocol whether to process 'http' or 'FTP' + * @param index the index name to write logs to + * @param type one of the available protocols from which Mudrod logs are obtained. + * @throws IOException if there is an error reading anything from the fileName provided. + */ + public void readLogFile(String fileName, String protocol, String index, String type) throws IOException { + BufferedReader br = new BufferedReader(new FileReader(fileName)); + int count = 0; + try { + String line = br.readLine(); + while (line != null) { + if ("FTP".equals(protocol)) { + parseSingleLineFTP(line, index, type); + } else { + parseSingleLineHTTP(line, index, type); + } + line = br.readLine(); + count++; + } + } catch (FileNotFoundException e) { + LOG.error("File not found.", e); + } catch (IOException e) { + LOG.error("Error reading input directory.", e); + } finally { + br.close(); + LOG.info("Num of {} entries:\t{}", protocol, count); + } + } + + /** + * Parse a single FTP log entry + * + * @param log a single log line + * @param index the index name we wish to persist the log line to + * @param type one of the available protocols from which Mudrod logs are obtained. + */ + public void parseSingleLineFTP(String log, String index, String type) { + String ip = log.split(" +")[6]; + + String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + log.split(" +")[3] + ":" + log.split(" +")[4]; + + time = switchtoNum(time); + SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy"); + Date date = null; + try { + date = formatter.parse(time); + } catch (ParseException e) { + LOG.error("Error whilst parsing the date.", e); + } + String bytes = log.split(" +")[7]; + + String request = log.split(" +")[8].toLowerCase(); + + if (!request.contains("/misc/") && !request.contains("readme")) { + IndexRequest ir; + try { + ir = new IndexRequest(index, type) + .source(jsonBuilder().startObject().field("LogType", "ftp").field("IP", ip).field("Time", date).field("Request", request).field("Bytes", Long.parseLong(bytes)).endObject()); + es.getBulkProcessor().add(ir); + } catch (NumberFormatException e) { + LOG.error("Error whilst processing numbers", e); + } catch (IOException e) { + LOG.error("IOError whilst adding to the bulk processor.", e); + } + } + + } + + /** + * Parse a single HTTP log entry + * + * @param log a single log line + * @param index the index name we wish to persist the log line to + * @param type one of the available protocols from which Mudrod logs are obtained. + */ + public void parseSingleLineHTTP(String log, String index, String type) { + matcher = p.matcher(log); + if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) { + return; + } + String time = matcher.group(4); + time = switchtoNum(time); + SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss"); + Date date = null; + try { + date = formatter.parse(time); + } catch (ParseException e) { + LOG.error("Error whilst attempting to parse date.", e); + } + + String bytes = matcher.group(7); + if ("-".equals(bytes)) { + bytes = "0"; + } + + String request = matcher.group(5).toLowerCase(); + String agent = matcher.group(9); + CrawlerDetection crawlerDe = new CrawlerDetection(this.props, this.es, this.spark); + if (!crawlerDe.checkKnownCrawler(agent)) { + boolean tag = false; + String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" }; + for (int i = 0; i < mimeTypes.length; i++) { + if (request.contains(mimeTypes[i])) { + tag = true; + break; + } + } + + if (!tag) { + IndexRequest ir = null; + executeBulkRequest(ir, index, type, matcher, date, bytes); + } + } + } + + private void executeBulkRequest(IndexRequest ir, String index, String type, Matcher matcher, Date date, String bytes) { + IndexRequest newIr = ir; + try { + newIr = new IndexRequest(index, type).source( + jsonBuilder().startObject().field("LogType", "PO.DAAC").field("IP", matcher.group(1)).field("Time", date).field("Request", matcher.group(5)).field("Response", matcher.group(6)) + .field("Bytes", Integer.parseInt(bytes)).field("Referer", matcher.group(8)).field("Browser", matcher.group(9)).endObject()); + + es.getBulkProcessor().add(newIr); + } catch (NumberFormatException e) { + LOG.error("Error whilst processing numbers", e); + } catch (IOException e) { + LOG.error("IOError whilst adding to the bulk processor.", e); + } + } + + @Override + public Object execute(Object o) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java new file mode 100644 index 0000000..23ddbee --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java @@ -0,0 +1,228 @@ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.commons.io.IOUtils; +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.weblog.partition.KGreedyPartitionSolver; +import org.apache.sdap.mudrod.weblog.partition.ThePartitionProblemSolver; +import org.apache.sdap.mudrod.weblog.partition.logPartitioner; +import org.apache.spark.Partition; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +public class LogAbstract extends DiscoveryStepAbstract { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(LogAbstract.class); + + public String logIndex = null; + public String httpType = null; + public String ftpType = null; + public String cleanupType = null; + public String sessionStats = null; + public int partition = 96; + + public LogAbstract(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + if (props != null) { + initLogIndex(); + } + } + + protected void initLogIndex() { + logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + props.getProperty(MudrodConstants.TIME_SUFFIX); + httpType = props.getProperty(MudrodConstants.HTTP_TYPE_PREFIX); + ftpType = props.getProperty(MudrodConstants.FTP_TYPE_PREFIX); + cleanupType = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX); + sessionStats = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX); + + InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS); + InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS); + JSONObject settingsJSON = null; + JSONObject mappingJSON = null; + + try { + settingsJSON = new JSONObject(IOUtils.toString(settingsStream)); + } catch (JSONException | IOException e1) { + LOG.error("Error reading Elasticsearch settings!", e1); + } + + try { + mappingJSON = new JSONObject(IOUtils.toString(mappingsStream)); + } catch (JSONException | IOException e1) { + LOG.error("Error reading Elasticsearch mappings!", e1); + } + + try { + if (settingsJSON != null && mappingJSON != null) { + this.es.putMapping(logIndex, settingsJSON.toString(), mappingJSON.toString()); + } + } catch (IOException e) { + LOG.error("Error entering Elasticsearch Mappings!", e); + } + } + + @Override + public Object execute() { + return null; + } + + @Override + public Object execute(Object o) { + return null; + } + + public JavaRDD<String> getUserRDD(String... type) { + Map<String, Double> userDocs = getUserDocs(type); + return parallizeUsers(userDocs); + } + + public List<String> getUsers(String type) { + + Terms users = this.getUserTerms(type); + List<String> userList = new ArrayList<>(); + for (Terms.Bucket entry : users.getBuckets()) { + String ip = (String) entry.getKey(); + userList.add(ip); + } + + return userList; + } + + public Terms getUserTerms(String... type) { + + int docCount = es.getDocCount(logIndex, type); + + SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0) + .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount)).execute().actionGet(); + return sr.getAggregations().get("Users"); + } + + public Map<String, Double> getUserDocs(String... type) { + + Terms users = this.getUserTerms(type); + Map<String, Double> userList = new HashMap<>(); + for (Terms.Bucket entry : users.getBuckets()) { + String ip = (String) entry.getKey(); + Long count = entry.getDocCount(); + userList.put(ip, Double.valueOf(count)); + } + + return userList; + } + + public Map<String, Long> getUserDailyDocs() { + + int docCount = es.getDocCount(logIndex, httpType); + + AggregationBuilder dailyAgg = AggregationBuilders.dateHistogram("by_day").field("Time").dateHistogramInterval(DateHistogramInterval.DAY).order(Order.COUNT_DESC); + + SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(httpType).setQuery(QueryBuilders.matchAllQuery()).setSize(0) + .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount).subAggregation(dailyAgg)).execute().actionGet(); + Terms users = sr.getAggregations().get("Users"); + Map<String, Long> userList = new HashMap<>(); + for (Terms.Bucket user : users.getBuckets()) { + String ip = (String) user.getKey(); + + System.out.println(ip); + + Histogram agg = user.getAggregations().get("by_day"); + List<? extends Histogram.Bucket> dateList = agg.getBuckets(); + int size = dateList.size(); + for (int i = 0; i < size; i++) { + Long count = dateList.get(i).getDocCount(); + String date = dateList.get(i).getKey().toString(); + + System.out.println(date); + System.out.println(count); + } + } + + return userList; + } + + protected void checkUserPartition(JavaRDD<String> userRDD) { + System.out.println("hhhhh"); + List<Partition> partitios = userRDD.partitions(); + System.out.println(partitios.size()); + int[] partitionIds = new int[partitios.size()]; + for (int i = 0; i < partitios.size(); i++) { + int index = partitios.get(i).index(); + partitionIds[i] = index; + } + + List<String>[] userIPs = userRDD.collectPartitions(partitionIds); + for (int i = 0; i < userIPs.length; i++) { + List<String> iuser = userIPs[i]; + System.out.println(i + " partition"); + System.out.println(iuser.toString()); + } + } + + public JavaRDD<String> parallizeUsers(Map<String, Double> userDocs) { + + // prepare list for parallize + List<Tuple2<String, Double>> list = new ArrayList<>(); + for (String user : userDocs.keySet()) { + list.add(new Tuple2<String, Double>(user, userDocs.get(user))); + } + + // group users + ThePartitionProblemSolver solution = new KGreedyPartitionSolver(); + Map<String, Integer> userGroups = solution.solve(userDocs, this.partition); + + JavaPairRDD<String, Double> pairRdd = spark.sc.parallelizePairs(list); + JavaPairRDD<String, Double> userPairRDD = pairRdd.partitionBy(new logPartitioner(userGroups, this.partition)); + + // repartitioned user RDD + return userPairRDD.keys(); + } + + public Terms getSessionTerms() { + + int docCount = es.getDocCount(this.logIndex, this.cleanupType); + + SearchResponse sr = es.getClient().prepareSearch(this.logIndex).setTypes(this.cleanupType).setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount)).execute().actionGet(); + + Terms Sessions = sr.getAggregations().get("Sessions"); + return Sessions; + } + + public List<String> getSessions() { + + Terms sessions = this.getSessionTerms(); + List<String> sessionList = new ArrayList<>(); + for (Terms.Bucket entry : sessions.getBuckets()) { + if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) { + String session = (String) entry.getKey(); + sessionList.add(session); + } + } + + return sessionList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RankingTrainDataGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RankingTrainDataGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RankingTrainDataGenerator.java new file mode 100644 index 0000000..766e853 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RankingTrainDataGenerator.java @@ -0,0 +1,54 @@ +package org.apache.sdap.mudrod.weblog.pre; + +import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract; +import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.SparkDriver; +import org.apache.sdap.mudrod.weblog.structure.RankingTrainData; +import org.apache.sdap.mudrod.weblog.structure.SessionExtractor; +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class RankingTrainDataGenerator extends DiscoveryStepAbstract { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(RankingTrainDataGenerator.class); + + public RankingTrainDataGenerator(Properties props, ESDriver es, SparkDriver spark) { + super(props, es, spark); + // TODO Auto-generated constructor stub + } + + @Override + public Object execute() { + // TODO Auto-generated method stub + LOG.info("Starting generate ranking train data."); + startTime = System.currentTimeMillis(); + + String rankingTrainFile = "E:\\Mudrod_input_data\\Testing_Data_4_1monthLog+Meta+Onto\\traing.txt"; + try { + SessionExtractor extractor = new SessionExtractor(); + JavaRDD<RankingTrainData> rankingTrainDataRDD = extractor.extractRankingTrainData(this.props, this.es, this.spark); + + JavaRDD<String> rankingTrainData_JsonRDD = rankingTrainDataRDD.map(f -> f.toJson()); + + rankingTrainData_JsonRDD.coalesce(1, true).saveAsTextFile(rankingTrainFile); + + } catch (Exception e) { + e.printStackTrace(); + } + + endTime = System.currentTimeMillis(); + LOG.info("Ranking train data generation complete. Time elapsed {} seconds.", (endTime - startTime) / 1000); + return null; + } + + @Override + public Object execute(Object o) { + // TODO Auto-generated method stub + return null; + } + +}
