Hi, after seeing that IDF needed refactoring to use ML vectors instead of MLLib ones, I have created a Jira ticket in https://issues.apache.org/jira/browse/SPARK-22531 <https://issues.apache.org/jira/browse/SPARK-22531> and submitted a PR for it. If anyone can have a look and suggest any changes it would be really appreciated.
Thank you. 2017-11-15 1:11 GMT+00:00 Bago Amirbekian <b...@databricks.com>: > There is a known issue with VectorAssembler which causes it to fail in > streaming if any of the input columns are of VectorType & don't have size > information, https://issues.apache.org/jira/browse/SPARK-22346. > > This can be fixed by adding size information to the vector columns, I've > made a PR to add a transformer to spark to help with this, > https://github.com/apache/spark/pull/19746. It would be awesome if you > could take a look and see if this would fix your issue. > > On Sun, Nov 12, 2017 at 5:37 PM Davis Varghese <vergh...@gmail.com> wrote: > >> Bago, >> >> Finally I am able to create one which fails consistently. I think the >> issue >> is caused by the VectorAssembler in the model. In the new code, I have 2 >> features(1 text and 1 number) and I have to run through a VectorAssembler >> before giving to LogisticRegression. Code and test data below >> >> import java.util.Arrays; >> import java.util.List; >> import org.apache.spark.ml.Pipeline; >> import org.apache.spark.ml.PipelineModel; >> import org.apache.spark.ml.PipelineStage; >> import org.apache.spark.ml.classification.LogisticRegression; >> import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator; >> import org.apache.spark.ml.feature.CountVectorizer; >> import org.apache.spark.ml.feature.CountVectorizerModel; >> import org.apache.spark.ml.feature.IndexToString; >> import org.apache.spark.ml.feature.StringIndexer; >> import org.apache.spark.ml.feature.StringIndexerModel; >> import org.apache.spark.ml.feature.Tokenizer; >> import org.apache.spark.ml.feature.VectorAssembler; >> import org.apache.spark.ml.param.ParamMap; >> import org.apache.spark.ml.tuning.ParamGridBuilder; >> import org.apache.spark.ml.tuning.TrainValidationSplit; >> import org.apache.spark.ml.tuning.TrainValidationSplitModel; >> import org.apache.spark.sql.Dataset; >> import org.apache.spark.sql.Row; >> import org.apache.spark.sql.RowFactory; >> import org.apache.spark.sql.SparkSession; >> import org.apache.spark.sql.streaming.StreamingQuery; >> import org.apache.spark.sql.types.DataTypes; >> import org.apache.spark.sql.types.Metadata; >> import org.apache.spark.sql.types.StructField; >> import org.apache.spark.sql.types.StructType; >> >> /** >> * A simple text classification pipeline that recognizes "spark" from >> input >> text. >> */ >> public class StreamingIssueCountVectorizerSplitFailed { >> >> public static void main(String[] args) throws Exception { >> SparkSession sparkSession = >> SparkSession.builder().appName("StreamingIssueCountVectorizer") >> .master("local[2]") >> .getOrCreate(); >> >> List<Row> _trainData = Arrays.asList( >> RowFactory.create("sunny fantastic day", 1, "Positive"), >> RowFactory.create("fantastic morning match", 1, "Positive"), >> RowFactory.create("good morning", 1, "Positive"), >> RowFactory.create("boring evening", 5, "Negative"), >> RowFactory.create("tragic evening event", 5, "Negative"), >> RowFactory.create("today is bad ", 5, "Negative") >> ); >> List<Row> _testData = Arrays.asList( >> RowFactory.create("sunny morning", 1), >> RowFactory.create("bad evening", 5) >> ); >> StructType schema = new StructType(new StructField[]{ >> new StructField("tweet", DataTypes.StringType, false, >> Metadata.empty()), >> new StructField("time", DataTypes.IntegerType, false, >> Metadata.empty()), >> new StructField("sentiment", DataTypes.StringType, true, >> Metadata.empty()) >> }); >> StructType testSchema = new StructType(new StructField[]{ >> new StructField("tweet", DataTypes.StringType, false, >> Metadata.empty()), >> new StructField("time", DataTypes.IntegerType, false, >> Metadata.empty()) >> }); >> >> Dataset<Row> trainData = sparkSession.createDataFrame(_trainData, >> schema); >> Dataset<Row> testData = sparkSession.createDataFrame(_testData, >> testSchema); >> StringIndexerModel labelIndexerModel = new StringIndexer() >> .setInputCol("sentiment") >> .setOutputCol("label") >> .setHandleInvalid("skip") >> .fit(trainData); >> Tokenizer tokenizer = new Tokenizer() >> .setInputCol("tweet") >> .setOutputCol("words"); >> CountVectorizer countVectorizer = new CountVectorizer() >> .setInputCol(tokenizer.getOutputCol()) >> .setOutputCol("wordfeatures") >> .setVocabSize(3) >> .setMinDF(2) >> .setMinTF(2) >> .setBinary(true); >> >> VectorAssembler vectorAssembler = new VectorAssembler() >> .setInputCols(new String[]{"wordfeatures", "time"}). >> setOutputCol("features"); >> >> Dataset<Row> words = tokenizer.transform(trainData); >> CountVectorizerModel countVectorizerModel = >> countVectorizer.fit(words); >> >> LogisticRegression lr = new LogisticRegression() >> .setMaxIter(10) >> .setRegParam(0.001); >> >> IndexToString labelConverter = new IndexToString() >> .setInputCol("prediction") >> .setOutputCol("predicted") >> .setLabels(labelIndexerModel.labels()); >> >> countVectorizerModel.setMinTF(1); >> >> Pipeline pipeline = new Pipeline() >> .setStages( >> new PipelineStage[]{labelIndexerModel, tokenizer, >> countVectorizerModel, vectorAssembler, >> lr, labelConverter}); >> ParamMap[] paramGrid = new ParamGridBuilder() >> .addGrid(lr.regParam(), new double[]{0.1, 0.01}) >> .addGrid(lr.fitIntercept()) >> .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0}) >> .build(); >> >> MulticlassClassificationEvaluator evaluator = new >> MulticlassClassificationEvaluator(); >> evaluator.setLabelCol("label"); >> evaluator.setPredictionCol("prediction"); >> >> TrainValidationSplit trainValidationSplit = new TrainValidationSplit() >> .setEstimator(pipeline) >> .setEvaluator(evaluator) >> .setEstimatorParamMaps(paramGrid) >> .setTrainRatio(0.7); >> >> // Fit the pipeline to training documents. >> TrainValidationSplitModel trainValidationSplitModel = >> trainValidationSplit.fit(trainData); >> >> >> trainValidationSplitModel.write().overwrite().save("/ >> tmp/CountSplit.model"); >> >> TrainValidationSplitModel _loadedModel = TrainValidationSplitModel >> .load("/tmp/CountSplit.model"); >> PipelineModel loadedModel = (PipelineModel) >> (_loadedModel).bestModel(); >> >> //Test on non-streaming data >> Dataset<Row> predicted = loadedModel.transform(testData); >> predicted.show(); >> List<Row> _rows = predicted.select("tweet", >> "predicted").collectAsList(); >> for (Row r : _rows) { >> System.out.println("[" + r.get(0) + "], prediction=" + r.get(1)); >> } >> >> //Test on streaming data >> >> Dataset<Row> lines = sparkSession.readStream().option("sep", ",") >> .schema(testSchema).option("header", >> "true").option("inferSchema", >> "true") >> .format("com.databricks.spark.csv") >> .load("file:///home/davis/Documents/Bugs/StreamingTwitter1"); >> >> StreamingQuery query = loadedModel.transform(lines).writeStream() >> .outputMode("append") >> .format("console") >> .start(); >> >> query.awaitTermination(); >> >> } >> } >> >> *##Test data csv file* >> tweet,time >> Today is a bright sunny day,2 >> How is everyone feeling in office?,2 >> I want beef cake. Where is it?,2 >> The weather sucks today,2 >> I like Vat69.,5 >> I don't care,5 >> Wassup,5 >> Skyfall sucks!,5 >> >> >> *Output* >> *--------* >> >> >> +-------------+----+----------------+-------------+--------- >> --------+--------------------+--------------------+----------+---------+ >> | tweet|time| words| wordfeatures| features| >> rawPrediction| probability|prediction|predicted| >> +-------------+----+----------------+-------------+--------- >> --------+--------------------+--------------------+----------+---------+ >> |sunny morning| 1|[sunny, >> morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765.. >> .|[0.96548740116159...| >> 0.0| Positive| >> | bad evening| 5| [bad, >> evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340.. >> .|[0.01152820807912...| >> 1.0| Negative| >> +-------------+----+----------------+-------------+--------- >> --------+--------------------+--------------------+----------+---------+ >> >> [sunny morning], prediction=Positive >> [bad evening], prediction=Negative >> Exception in thread "main" org.apache.spark.sql.AnalysisException: >> Queries >> with streaming sources must be executed with writeStream.start();; >> FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1] >> at >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$. >> org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$ >> throwError(UnsupportedOperationChecker.scala:297) >> at >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$ >> anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) >> at >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$ >> anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( >> TreeNode.scala:127) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> foreachUp$1.apply(TreeNode.scala:126) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> foreachUp$1.apply(TreeNode.scala:126) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( >> TreeNode.scala:126) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> foreachUp$1.apply(TreeNode.scala:126) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> foreachUp$1.apply(TreeNode.scala:126) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( >> TreeNode.scala:126) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> foreachUp$1.apply(TreeNode.scala:126) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> foreachUp$1.apply(TreeNode.scala:126) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( >> TreeNode.scala:126) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> foreachUp$1.apply(TreeNode.scala:126) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ >> foreachUp$1.apply(TreeNode.scala:126) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at >> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( >> TreeNode.scala:126) >> at >> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$. >> checkForBatch(UnsupportedOperationChecker.scala:34) >> at >> org.apache.spark.sql.execution.QueryExecution.assertSupported( >> QueryExecution.scala:63) >> at >> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute( >> QueryExecution.scala:74) >> at >> org.apache.spark.sql.execution.QueryExecution. >> withCachedData(QueryExecution.scala:72) >> at >> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute( >> QueryExecution.scala:78) >> at >> org.apache.spark.sql.execution.QueryExecution. >> optimizedPlan(QueryExecution.scala:78) >> at >> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute( >> QueryExecution.scala:84) >> at >> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution. >> scala:80) >> at >> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute( >> QueryExecution.scala:89) >> at >> org.apache.spark.sql.execution.QueryExecution. >> executedPlan(QueryExecution.scala:89) >> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832) >> at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) >> at org.apache.spark.sql.Dataset.head(Dataset.scala:2160) >> at org.apache.spark.sql.Dataset.first(Dataset.scala:2167) >> at >> org.apache.spark.ml.feature.VectorAssembler.first$ >> lzycompute$1(VectorAssembler.scala:57) >> at >> org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$ >> VectorAssembler$$first$1(VectorAssembler.scala:57) >> at >> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$ >> anonfun$1.apply$mcI$sp(VectorAssembler.scala:88) >> at >> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply( >> VectorAssembler.scala:88) >> at >> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply( >> VectorAssembler.scala:88) >> at scala.Option.getOrElse(Option.scala:121) >> at >> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2. >> apply(VectorAssembler.scala:88) >> at >> org.apache.spark.ml.feature.VectorAssembler$$anonfun$2. >> apply(VectorAssembler.scala:58) >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply( >> TraversableLike.scala:241) >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply( >> TraversableLike.scala:241) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized. >> scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach( >> ArrayOps.scala:186) >> at >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) >> at scala.collection.mutable.ArrayOps$ofRef.flatMap( >> ArrayOps.scala:186) >> at >> org.apache.spark.ml.feature.VectorAssembler.transform( >> VectorAssembler.scala:58) >> at >> org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline. >> scala:305) >> at >> org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline. >> scala:305) >> at >> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized. >> scala:57) >> at >> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized. >> scala:66) >> at scala.collection.mutable.ArrayOps$ofRef.foldLeft( >> ArrayOps.scala:186) >> at org.apache.spark.ml.PipelineModel.transform( >> Pipeline.scala:305) >> at >> StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerS >> plit.java:164) >> >> >> >> >> -- >> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >>