Bago,

Finally I am able to create one which fails consistently. I think the issue
is caused by the VectorAssembler in the model. In the new code, I have 2
features(1 text and 1 number) and I have to run through a VectorAssembler
before giving to LogisticRegression. Code and test data below

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * A simple text classification pipeline that recognizes "spark" from input
text.
 */
public class StreamingIssueCountVectorizerSplitFailed {

  public static void main(String[] args) throws Exception {
    SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
        .master("local[2]")
        .getOrCreate();

    List<Row> _trainData = Arrays.asList(
        RowFactory.create("sunny fantastic day", 1, "Positive"),
        RowFactory.create("fantastic morning match", 1, "Positive"),
        RowFactory.create("good morning", 1, "Positive"),
        RowFactory.create("boring evening", 5, "Negative"),
        RowFactory.create("tragic evening event", 5, "Negative"),
        RowFactory.create("today is bad ", 5, "Negative")
    );
    List<Row> _testData = Arrays.asList(
        RowFactory.create("sunny morning", 1),
        RowFactory.create("bad evening", 5)
    );
    StructType schema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty()),
        new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
    });
    StructType testSchema = new StructType(new StructField[]{
        new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
        new StructField("time", DataTypes.IntegerType, false,
Metadata.empty())
    });

    Dataset<Row> trainData = sparkSession.createDataFrame(_trainData,
schema);
    Dataset<Row> testData = sparkSession.createDataFrame(_testData,
testSchema);
    StringIndexerModel labelIndexerModel = new StringIndexer()
        .setInputCol("sentiment")
        .setOutputCol("label")
        .setHandleInvalid("skip")
        .fit(trainData);
    Tokenizer tokenizer = new Tokenizer()
        .setInputCol("tweet")
        .setOutputCol("words");
    CountVectorizer countVectorizer = new CountVectorizer()
        .setInputCol(tokenizer.getOutputCol())
        .setOutputCol("wordfeatures")
        .setVocabSize(3)
        .setMinDF(2)
        .setMinTF(2)
        .setBinary(true);

    VectorAssembler vectorAssembler = new VectorAssembler()
        .setInputCols(new String[]{"wordfeatures", "time"}).
            setOutputCol("features");

    Dataset<Row> words = tokenizer.transform(trainData);
    CountVectorizerModel countVectorizerModel = countVectorizer.fit(words);

    LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.001);

    IndexToString labelConverter = new IndexToString()
        .setInputCol("prediction")
        .setOutputCol("predicted")
        .setLabels(labelIndexerModel.labels());

    countVectorizerModel.setMinTF(1);

    Pipeline pipeline = new Pipeline()
        .setStages(
            new PipelineStage[]{labelIndexerModel, tokenizer,
countVectorizerModel, vectorAssembler,
                lr, labelConverter});
    ParamMap[] paramGrid = new ParamGridBuilder()
        .addGrid(lr.regParam(), new double[]{0.1, 0.01})
        .addGrid(lr.fitIntercept())
        .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
        .build();

    MulticlassClassificationEvaluator evaluator = new
MulticlassClassificationEvaluator();
    evaluator.setLabelCol("label");
    evaluator.setPredictionCol("prediction");

    TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
        .setEstimator(pipeline)
        .setEvaluator(evaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.7);

    // Fit the pipeline to training documents.
    TrainValidationSplitModel trainValidationSplitModel =
trainValidationSplit.fit(trainData);

   
trainValidationSplitModel.write().overwrite().save("/tmp/CountSplit.model");

    TrainValidationSplitModel _loadedModel = TrainValidationSplitModel
        .load("/tmp/CountSplit.model");
    PipelineModel loadedModel = (PipelineModel) (_loadedModel).bestModel();

    //Test on non-streaming data
    Dataset<Row> predicted = loadedModel.transform(testData);
    predicted.show();
    List<Row> _rows = predicted.select("tweet",
"predicted").collectAsList();
    for (Row r : _rows) {
      System.out.println("[" + r.get(0) + "], prediction=" + r.get(1));
    }

    //Test on streaming data

    Dataset<Row> lines = sparkSession.readStream().option("sep", ",")
        .schema(testSchema).option("header", "true").option("inferSchema",
"true")
        .format("com.databricks.spark.csv")
        .load("file:///home/davis/Documents/Bugs/StreamingTwitter1");

    StreamingQuery query = loadedModel.transform(lines).writeStream()
        .outputMode("append")
        .format("console")
        .start();

    query.awaitTermination();

  }
}

*##Test data csv file*
tweet,time
Today is a bright sunny day,2
How is everyone feeling in office?,2
I want beef cake. Where is it?,2
The weather sucks today,2
I like Vat69.,5
I don't care,5
Wassup,5
Skyfall sucks!,5


*Output*
*--------*


+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|        tweet|time|           words| wordfeatures|         features|      
rawPrediction|         probability|prediction|predicted|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+
|sunny morning|   1|[sunny,
morning]|(3,[1],[1.0])|[0.0,1.0,0.0,1.0]|[3.33130861165765...|[0.96548740116159...|
      
0.0| Positive|
|  bad evening|   5|  [bad,
evening]|(3,[0],[1.0])|[1.0,0.0,0.0,5.0]|[-4.4513631975340...|[0.01152820807912...|
      
1.0| Negative|
+-------------+----+----------------+-------------+-----------------+--------------------+--------------------+----------+---------+

[sunny morning], prediction=Positive
[bad evening], prediction=Negative
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries
with streaming sources must be executed with writeStream.start();;
FileSource[file:///home/davis/Documents/Bugs/StreamingTwitter1]
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
        at
org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
        at
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
        at
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
        at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
        at
org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
        at
org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at scala.Option.getOrElse(Option.scala:121)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
        at
org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
        at
org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
        at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at
org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
        at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
        at
StreamingIssueCountVectorizerSplit.main(StreamingIssueCountVectorizerSplit.java:164)




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to