Bago, Finally I am able to create one which fails consistently. I think the issue is caused by the VectorAssembler in the model. In the new code, I have 2 features(1 text and 1 number) and I have to run through a VectorAssembler before giving to LogisticRegression. Code and test data below
import java.util.Arrays; import java.util.List; import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineModel; import org.apache.spark.ml.PipelineStage; import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator; import org.apache.spark.ml.feature.CountVectorizer; import org.apache.spark.ml.feature.CountVectorizerModel; import org.apache.spark.ml.feature.IndexToString; import org.apache.spark.ml.feature.StringIndexer; import org.apache.spark.ml.feature.StringIndexerModel; import org.apache.spark.ml.feature.Tokenizer; import org.apache.spark.ml.feature.VectorAssembler; import org.apache.spark.ml.param.ParamMap; import org.apache.spark.ml.tuning.ParamGridBuilder; import org.apache.spark.ml.tuning.TrainValidationSplit; import org.apache.spark.ml.tuning.TrainValidationSplitModel; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * A simple text classification pipeline that recognizes "spark" from input text. */ public class StreamingIssueCountVectorizerSplitFailed { public static void main(String[] args) throws Exception { SparkSession sparkSession = SparkSession.builder().appName("StreamingIssueCountVectorizer") .master("local[2]") .getOrCreate(); List<Row> _trainData = Arrays.asList( RowFactory.create("sunny fantastic day", 1, "Positive"), RowFactory.create("fantastic morning match", 1, "Positive"), RowFactory.create("good morning", 1, "Positive"), RowFactory.create("boring evening", 5, "Negative"), RowFactory.create("tragic evening event", 5, "Negative"), RowFactory.create("today is bad ", 5, "Negative") ); List<Row> _testData = Arrays.asList( RowFactory.create("sunny morning", 1), RowFactory.create("bad evening", 5) ); StructType schema = new StructType(new StructField[]{ new StructField("tweet", DataTypes.StringType, false, Metadata.empty()), new StructField("time", DataTypes.IntegerType, false, Metadata.empty()), new StructField("sentiment", DataTypes.StringType, true, Metadata.empty()) }); StructType testSchema = new StructType(new StructField[]{ new StructField("tweet", DataTypes.StringType, false, Metadata.empty()), new StructField("time", DataTypes.IntegerType, false, Metadata.empty()) }); Dataset<Row> trainData = sparkSession.createDataFrame(_trainData, schema); Dataset<Row> testData = sparkSession.createDataFrame(_testData, testSchema); StringIndexerModel labelIndexerModel = new StringIndexer() .setInputCol("sentiment") .setOutputCol("label") .setHandleInvalid("skip") .fit(trainData); Tokenizer tokenizer = new Tokenizer() .setInputCol("tweet") .setOutputCol("words"); CountVectorizer countVectorizer = new CountVectorizer() .setInputCol(tokenizer.getOutputCol()) .setOutputCol("wordfeatures") .setVocabSize(3) .setMinDF(2) .setMinTF(2) .setBinary(true); VectorAssembler vectorAssembler = new VectorAssembler() .setInputCols(new String[]{"wordfeatures", "time"}). setOutputCol("features"); Dataset<Row> words = tokenizer.transform(trainData); CountVectorizerModel countVectorizerModel = countVectorizer.fit(words); LogisticRegression lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.001); IndexToString labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predicted") .setLabels(labelIndexerModel.labels()); countVectorizerModel.setMinTF(1); Pipeline pipeline = new Pipeline() .setStages( new PipelineStage[]{labelIndexerModel, tokenizer, countVectorizerModel, vectorAssembler, lr, labelConverter}); ParamMap[] paramGrid = new ParamGridBuilder() .addGrid(lr.regParam(), new double[]{0.1, 0.01}) .addGrid(lr.fitIntercept()) .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0}) .build(); MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator(); evaluator.setLabelCol("label"); evaluator.setPredictionCol("prediction"); TrainValidationSplit trainValidationSplit = new TrainValidationSplit() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setTrainRatio(0.7); // Fit the pipeline to training documents. TrainValidationSplitModel trainValidationSplitModel = trainValidationSplit.fit(trainData); trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model"); TrainValidationSplitModel _loadedModel = TrainValidationSplitModel .load("/tmp/CountSplit.model"); PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel(); //Test on non-streaming data Dataset<Row> predicted = loadedModel.transform(testData); predicted.show(); List<Row> _rows = predicted.select("tweet", "predicted").collectAsList(); for (Row r : _rows) { System.out.println("[" + r.get(0) + "], prediction=" + r.get(1)); } //Test on streaming data Dataset<Row> lines = sparkSession.readStream().option("sep", ",") .schema(testSchema).option("header", "true").option("inferSchema", "true") .format("com.databricks.spark.csv") .load("file:///home/davis/Documents/Bugs/StreamingTwitter1"); StreamingQuery query = loadedModel.transform(lines).writeStream() .outputMode("append") .format("console") .start(); query.awaitTermination(); } } *##Test data csv file* tweet,time Today is a bright sunny day,2 How is everyone feeling in office?,2 I want beef cake. Where is it?,2 The weather sucks today,2 I like Vat69.,5 I don't care,5 Wassup,5 Skyfall sucks!,5 *Output* *--------* +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+ | tweet|time| words| wordfeatures| features| rawPrediction| probability|prediction|predicted| +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+ |sunny morning| 1|[sunny, morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765...|[0.96548740116159...| 0.0| Positive| | bad evening| 5| [bad, evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340...|[0.01152820807912...| 1.0| Negative| +-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+ [sunny morning], prediction=Positive [bad evening], prediction=Negative Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832) at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) at org.apache.spark.sql.Dataset.head(Dataset.scala:2160) at org.apache.spark.sql.Dataset.first(Dataset.scala:2167) at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57) at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88) at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58) at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305) at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186) at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305) at StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerSplit.java:164) -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org