[
https://issues.apache.org/jira/browse/SPARK-23288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387266#comment-16387266
]
Apache Spark commented on SPARK-23288:
--------------------------------------
User 'gaborgsomogyi' has created a pull request for this issue:
https://github.com/apache/spark/pull/20745
> Incorrect number of written records in structured streaming
> -----------------------------------------------------------
>
> Key: SPARK-23288
> URL: https://issues.apache.org/jira/browse/SPARK-23288
> Project: Spark
> Issue Type: Bug
> Components: SQL, Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Yuriy Bondaruk
> Priority: Major
> Labels: Metrics, metrics
>
> I'm using SparkListener.onTaskEnd() to capture input and output metrics but
> it seems that number of written records
> ('taskEnd.taskMetrics().outputMetrics().recordsWritten()') is incorrect. Here
> is my stream construction:
>
> {code:java}
> StreamingQuery writeStream = session
> .readStream()
> .schema(RecordSchema.fromClass(TestRecord.class))
> .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
> .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
> .csv(inputFolder.getRoot().toPath().toString())
> .as(Encoders.bean(TestRecord.class))
> .flatMap(
> ((FlatMapFunction<TestRecord, TestVendingRecord>) (u) -> {
> List<TestVendingRecord> resultIterable = new ArrayList<>();
> try {
> TestVendingRecord result = transformer.convert(u);
> resultIterable.add(result);
> } catch (Throwable t) {
> System.err.println("Ooops");
> t.printStackTrace();
> }
> return resultIterable.iterator();
> }),
> Encoders.bean(TestVendingRecord.class))
> .writeStream()
> .outputMode(OutputMode.Append())
> .format("parquet")
> .option("path", outputFolder.getRoot().toPath().toString())
> .option("checkpointLocation",
> checkpointFolder.getRoot().toPath().toString())
> .start();
> writeStream.processAllAvailable();
> writeStream.stop();
> {code}
> Tested it with one good and one bad (throwing exception in
> transformer.convert(u)) input records and it produces following metrics:
>
> {code:java}
> (TestMain.java:onTaskEnd(73)) - -----------status--> SUCCESS
> (TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0
> (TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2
> (TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables():
> (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
> (TestMain.java:onTaskEnd(85)) - value = 323
> (TestMain.java:onTaskEnd(84)) - name = number of output rows
> (TestMain.java:onTaskEnd(85)) - value = 2
> (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
> (TestMain.java:onTaskEnd(85)) - value = 364
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.recordsRead
> (TestMain.java:onTaskEnd(85)) - value = 2
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.bytesRead
> (TestMain.java:onTaskEnd(85)) - value = 157
> (TestMain.java:onTaskEnd(84)) - name =
> internal.metrics.resultSerializationTime
> (TestMain.java:onTaskEnd(85)) - value = 3
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSize
> (TestMain.java:onTaskEnd(85)) - value = 2396
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorCpuTime
> (TestMain.java:onTaskEnd(85)) - value = 633807000
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorRunTime
> (TestMain.java:onTaskEnd(85)) - value = 683
> (TestMain.java:onTaskEnd(84)) - name =
> internal.metrics.executorDeserializeCpuTime
> (TestMain.java:onTaskEnd(85)) - value = 55662000
> (TestMain.java:onTaskEnd(84)) - name =
> internal.metrics.executorDeserializeTime
> (TestMain.java:onTaskEnd(85)) - value = 58
> (TestMain.java:onTaskEnd(89)) - input records 2
> Streaming query made progress: {
> "id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5",
> "runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7",
> "name" : null,
> "timestamp" : "2018-01-26T14:44:05.362Z",
> "numInputRows" : 2,
> "processedRowsPerSecond" : 0.8163265306122448,
> "durationMs" : {
> "addBatch" : 1994,
> "getBatch" : 126,
> "getOffset" : 52,
> "queryPlanning" : 220,
> "triggerExecution" : 2450,
> "walCommit" : 41
> },
> "stateOperators" : [ ],
> "sources" : [ {
> "description" :
> "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3661035412295337071]",
> "startOffset" : null,
> "endOffset" : {
> "logOffset" : 0
> },
> "numInputRows" : 2,
> "processedRowsPerSecond" : 0.8163265306122448
> } ],
> "sink" : {
> "description" :
> "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3785605384928624065]"
> }
> }
> {code}
> The number of inputs is correct but the number of output records taken from
> taskEnd.taskMetrics().outputMetrics().recordsWritten() is zero. Accumulables
> (taskEnd.taskInfo().accumulables()) don't have a correct value as well -
> should be 1 but it shows 2 'number of output rows'.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]