There is a known issue with VectorAssembler which causes it to fail in streaming if any of the input columns are of VectorType & don't have size information, https://issues.apache.org/jira/browse/SPARK-22346.
This can be fixed by adding size information to the vector columns, I've made a PR to add a transformer to spark to help with this, https://github.com/apache/spark/pull/19746. It would be awesome if you could take a look and see if this would fix your issue. On Sun, Nov 12, 2017 at 5:37 PM Davis Varghese <vergh...@gmail.com> wrote: > Bago, > > Finally I am able to create one which fails consistently. I think the issue > is caused by the VectorAssembler in the model. In the new code, I have 2 > features(1 text and 1 number) and I have to run through a VectorAssembler > before giving to LogisticRegression. Code and test data below > > import java.util.Arrays; > import java.util.List; > import org.apache.spark.ml.Pipeline; > import org.apache.spark.ml.PipelineModel; > import org.apache.spark.ml.PipelineStage; > import org.apache.spark.ml.classification.LogisticRegression; > import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator; > import org.apache.spark.ml.feature.CountVectorizer; > import org.apache.spark.ml.feature.CountVectorizerModel; > import org.apache.spark.ml.feature.IndexToString; > import org.apache.spark.ml.feature.StringIndexer; > import org.apache.spark.ml.feature.StringIndexerModel; > import org.apache.spark.ml.feature.Tokenizer; > import org.apache.spark.ml.feature.VectorAssembler; > import org.apache.spark.ml.param.ParamMap; > import org.apache.spark.ml.tuning.ParamGridBuilder; > import org.apache.spark.ml.tuning.TrainValidationSplit; > import org.apache.spark.ml.tuning.TrainValidationSplitModel; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.streaming.StreamingQuery; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.Metadata; > import org.apache.spark.sql.types.StructField; > import org.apache.spark.sql.types.StructType; > > /** > * A simple text classification pipeline that recognizes "spark" from input > text. > */ > public class StreamingIssueCountVectorizerSplitFailed { > > public static void main(String[] args) throws Exception { > SparkSession sparkSession = > SparkSession.builder().appName("StreamingIssueCountVectorizer") > .master("local[2]") > .getOrCreate(); > > List<Row> _trainData = Arrays.asList( > RowFactory.create("sunny fantastic day", 1, "Positive"), > RowFactory.create("fantastic morning match", 1, "Positive"), > RowFactory.create("good morning", 1, "Positive"), > RowFactory.create("boring evening", 5, "Negative"), > RowFactory.create("tragic evening event", 5, "Negative"), > RowFactory.create("today is bad ", 5, "Negative") > ); > List<Row> _testData = Arrays.asList( > RowFactory.create("sunny morning", 1), > RowFactory.create("bad evening", 5) > ); > StructType schema = new StructType(new StructField[]{ > new StructField("tweet", DataTypes.StringType, false, > Metadata.empty()), > new StructField("time", DataTypes.IntegerType, false, > Metadata.empty()), > new StructField("sentiment", DataTypes.StringType, true, > Metadata.empty()) > }); > StructType testSchema = new StructType(new StructField[]{ > new StructField("tweet", DataTypes.StringType, false, > Metadata.empty()), > new StructField("time", DataTypes.IntegerType, false, > Metadata.empty()) > }); > > Dataset<Row> trainData = sparkSession.createDataFrame(_trainData, > schema); > Dataset<Row> testData = sparkSession.createDataFrame(_testData, > testSchema); > StringIndexerModel labelIndexerModel = new StringIndexer() > .setInputCol("sentiment") > .setOutputCol("label") > .setHandleInvalid("skip") > .fit(trainData); > Tokenizer tokenizer = new Tokenizer() > .setInputCol("tweet") > .setOutputCol("words"); > CountVectorizer countVectorizer = new CountVectorizer() > .setInputCol(tokenizer.getOutputCol()) > .setOutputCol("wordfeatures") > .setVocabSize(3) > .setMinDF(2) > .setMinTF(2) > .setBinary(true); > > VectorAssembler vectorAssembler = new VectorAssembler() > .setInputCols(new String[]{"wordfeatures", "time"}). > setOutputCol("features"); > > Dataset<Row> words = tokenizer.transform(trainData); > CountVectorizerModel countVectorizerModel = countVectorizer.fit(words); > > LogisticRegression lr = new LogisticRegression() > .setMaxIter(10) > .setRegParam(0.001); > > IndexToString labelConverter = new IndexToString() > .setInputCol("prediction") > .setOutputCol("predicted") > .setLabels(labelIndexerModel.labels()); > > countVectorizerModel.setMinTF(1); > > Pipeline pipeline = new Pipeline() > .setStages( > new PipelineStage[]{labelIndexerModel, tokenizer, > countVectorizerModel, vectorAssembler, > lr, labelConverter}); > ParamMap[] paramGrid = new ParamGridBuilder() > .addGrid(lr.regParam(), new double[]{0.1, 0.01}) > .addGrid(lr.fitIntercept()) > .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0}) > .build(); > > MulticlassClassificationEvaluator evaluator = new > MulticlassClassificationEvaluator(); > evaluator.setLabelCol("label"); > evaluator.setPredictionCol("prediction"); > > TrainValidationSplit trainValidationSplit = new TrainValidationSplit() > .setEstimator(pipeline) > .setEvaluator(evaluator) > .setEstimatorParamMaps(paramGrid) > .setTrainRatio(0.7); > > // Fit the pipeline to training documents. > TrainValidationSplitModel trainValidationSplitModel = > trainValidationSplit.fit(trainData); > > > > trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model"); > > TrainValidationSplitModel _loadedModel = TrainValidationSplitModel > .load("/tmp/CountSplit.model"); > PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel(); > > //Test on non-streaming data > Dataset<Row> predicted = loadedModel.transform(testData); > predicted.show(); > List<Row> _rows = predicted.select("tweet", > "predicted").collectAsList(); > for (Row r : _rows) { > System.out.println("[" + r.get(0) + "], prediction=" + r.get(1)); > } > > //Test on streaming data > > Dataset<Row> lines = sparkSession.readStream().option("sep", ",") > .schema(testSchema).option("header", "true").option("inferSchema", > "true") > .format("com.databricks.spark.csv") > .load("file:///home/davis/Documents/Bugs/StreamingTwitter1"); > > StreamingQuery query = loadedModel.transform(lines).writeStream() > .outputMode("append") > .format("console") > .start(); > > query.awaitTermination(); > > } > } > > *##Test data csv file* > tweet,time > Today is a bright sunny day,2 > How is everyone feeling in office?,2 > I want beef cake. Where is it?,2 > The weather sucks today,2 > I like Vat69.,5 > I don't care,5 > Wassup,5 > Skyfall sucks!,5 > > > *Output* > *--------* > > > > +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+ > | tweet|time| words| wordfeatures| features| > rawPrediction| probability|prediction|predicted| > > +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+ > |sunny morning| 1|[sunny, > > morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765...|[0.96548740116159...| > 0.0| Positive| > | bad evening| 5| [bad, > > evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340...|[0.01152820807912...| > 1.0| Negative| > > +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+ > > [sunny morning], prediction=Positive > [bad evening], prediction=Negative > Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries > with streaming sources must be executed with writeStream.start();; > FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1] > at > > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) > at > > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) > at > > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) > at > > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34) > at > > org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63) > at > > org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74) > at > > org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72) > at > > org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) > at > > org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) > at > > org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) > at > > org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) > at > > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) > at > > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2160) > at org.apache.spark.sql.Dataset.first(Dataset.scala:2167) > at > > org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57) > at > org.apache.spark.ml.feature.VectorAssembler.org > $apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57) > at > > org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88) > at > > org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88) > at > > org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88) > at scala.Option.getOrElse(Option.scala:121) > at > > org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88) > at > > org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58) > at > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at > scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) > at > > org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58) > at > org.apache.spark.ml > .PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305) > at > org.apache.spark.ml > .PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305) > at > > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) > at > > scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) > at > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186) > at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305) > at > > StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerSplit.java:164) > > > > > -- > Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >