This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 20cc51fec [spark] Report output metric when writing (#3927)
20cc51fec is described below

commit 20cc51fec605c426c4e16ad64e7c27b5e8126696
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Aug 11 19:44:37 2024 +0800

    [spark] Report output metric when writing (#3927)
---
 paimon-spark/paimon-spark-3.1/pom.xml              | 37 ++++++++++
 paimon-spark/paimon-spark-3.2/pom.xml              | 37 ++++++++++
 paimon-spark/paimon-spark-3.3/pom.xml              | 37 ++++++++++
 paimon-spark/paimon-spark-3.4/pom.xml              | 37 ++++++++++
 paimon-spark/paimon-spark-3.5/pom.xml              | 37 ++++++++++
 paimon-spark/paimon-spark-common/pom.xml           | 37 ++++++++++
 .../org/apache/paimon/spark/SparkTableWrite.java   | 78 --------------------
 .../org/apache/paimon/spark/SparkTableWrite.scala  | 82 ++++++++++++++++++++++
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  6 +-
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |  9 +++
 .../apache/paimon/spark/sql/PaimonMetricTest.scala | 26 +++++++
 11 files changed, 342 insertions(+), 81 deletions(-)

diff --git a/paimon-spark/paimon-spark-3.1/pom.xml 
b/paimon-spark/paimon-spark-3.1/pom.xml
index ba1f1d27e..0fcf1bdeb 100644
--- a/paimon-spark/paimon-spark-3.1/pom.xml
+++ b/paimon-spark/paimon-spark-3.1/pom.xml
@@ -91,6 +91,43 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-column</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.12</artifactId>
diff --git a/paimon-spark/paimon-spark-3.2/pom.xml 
b/paimon-spark/paimon-spark-3.2/pom.xml
index 07db172ef..865f3a9ee 100644
--- a/paimon-spark/paimon-spark-3.2/pom.xml
+++ b/paimon-spark/paimon-spark-3.2/pom.xml
@@ -95,6 +95,43 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-column</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-spark-common</artifactId>
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml 
b/paimon-spark/paimon-spark-3.3/pom.xml
index 1405bea61..8345f5b4a 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -91,6 +91,43 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-column</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-spark-common</artifactId>
diff --git a/paimon-spark/paimon-spark-3.4/pom.xml 
b/paimon-spark/paimon-spark-3.4/pom.xml
index 93ce44705..47e1952b6 100644
--- a/paimon-spark/paimon-spark-3.4/pom.xml
+++ b/paimon-spark/paimon-spark-3.4/pom.xml
@@ -91,6 +91,43 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-column</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-spark-common</artifactId>
diff --git a/paimon-spark/paimon-spark-3.5/pom.xml 
b/paimon-spark/paimon-spark-3.5/pom.xml
index d37f1f10f..1794dfd93 100644
--- a/paimon-spark/paimon-spark-3.5/pom.xml
+++ b/paimon-spark/paimon-spark-3.5/pom.xml
@@ -87,6 +87,43 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-column</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-spark-common</artifactId>
diff --git a/paimon-spark/paimon-spark-common/pom.xml 
b/paimon-spark/paimon-spark-common/pom.xml
index f08d86467..17e4baa3c 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -94,6 +94,43 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-column</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-hive_2.12</artifactId>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTableWrite.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTableWrite.java
deleted file mode 100644
index 3e8926080..000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTableWrite.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark;
-
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.spark.util.SparkRowUtils;
-import org.apache.paimon.table.sink.BatchTableWrite;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.CommitMessageSerializer;
-import org.apache.paimon.types.RowType;
-
-import org.apache.spark.sql.Row;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/** An util class for {@link BatchTableWrite}. */
-public class SparkTableWrite implements AutoCloseable {
-
-    private final BatchTableWrite write;
-    private final IOManager ioManager;
-
-    private final RowType rowType;
-    private final int rowKindColIdx;
-
-    public SparkTableWrite(BatchWriteBuilder writeBuilder, RowType rowType, 
int rowKindColIdx) {
-        this.write = writeBuilder.newWrite();
-        this.rowType = rowType;
-        this.rowKindColIdx = rowKindColIdx;
-        this.ioManager = SparkUtils.createIOManager();
-        write.withIOManager(ioManager);
-    }
-
-    public void write(Row row) throws Exception {
-        write.write(toPaimonRow(row));
-    }
-
-    public void write(Row row, int bucket) throws Exception {
-        write.write(toPaimonRow(row), bucket);
-    }
-
-    public Iterator<byte[]> finish() throws Exception {
-        CommitMessageSerializer serializer = new CommitMessageSerializer();
-        List<byte[]> commitMessages = new ArrayList<>();
-        for (CommitMessage message : write.prepareCommit()) {
-            commitMessages.add(serializer.serialize(message));
-        }
-        return commitMessages.iterator();
-    }
-
-    @Override
-    public void close() throws Exception {
-        write.close();
-        ioManager.close();
-    }
-
-    private SparkRow toPaimonRow(Row row) {
-        return new SparkRow(rowType, row, SparkRowUtils.getRowKind(row, 
rowKindColIdx));
-    }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
new file mode 100644
index 000000000..920d907e2
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.disk.IOManager
+import org.apache.paimon.spark.util.SparkRowUtils
+import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, 
CommitMessageImpl, CommitMessageSerializer}
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.{PaimonUtils, Row}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+class SparkTableWrite(writeBuilder: BatchWriteBuilder, rowType: RowType, 
rowKindColIdx: Int)
+  extends AutoCloseable {
+
+  val ioManager: IOManager = SparkUtils.createIOManager
+  val write: BatchTableWrite =
+    
writeBuilder.newWrite().withIOManager(ioManager).asInstanceOf[BatchTableWrite]
+
+  def write(row: Row): Unit = {
+    write.write(toPaimonRow(row))
+  }
+
+  def write(row: Row, bucket: Int): Unit = {
+    write.write(toPaimonRow(row), bucket)
+  }
+
+  def finish(): Iterator[Array[Byte]] = {
+    var bytesWritten = 0L
+    var recordsWritten = 0L
+    val commitMessages = new ListBuffer[Array[Byte]]()
+    val serializer = new CommitMessageSerializer()
+    write.prepareCommit().asScala.foreach {
+      case message: CommitMessageImpl =>
+        message.newFilesIncrement().newFiles().asScala.foreach {
+          dataFileMeta =>
+            bytesWritten += dataFileMeta.fileSize()
+            recordsWritten += dataFileMeta.rowCount()
+        }
+        commitMessages += serializer.serialize(message)
+    }
+    reportOutputMetrics(bytesWritten, recordsWritten)
+    commitMessages.iterator
+  }
+
+  override def close(): Unit = {
+    write.close()
+    ioManager.close()
+  }
+
+  private def toPaimonRow(row: Row) =
+    new SparkRow(rowType, row, SparkRowUtils.getRowKind(row, rowKindColIdx))
+
+  private def reportOutputMetrics(bytesWritten: Long, recordsWritten: Long): 
Unit = {
+    val taskContext = TaskContext.get
+    if (taskContext != null) {
+      PaimonUtils.updateOutputMetrics(
+        taskContext.taskMetrics.outputMetrics,
+        bytesWritten,
+        recordsWritten)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index a5681e54e..d12909b8a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -80,7 +80,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
             val write = newWrite()
             try {
               iter.foreach(row => write.write(row))
-              write.finish().asScala
+              write.finish()
             } finally {
               write.close()
             }
@@ -99,7 +99,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
             val write = newWrite()
             try {
               iter.foreach(row => write.write(row, row.getInt(bucketColIdx)))
-              write.finish().asScala
+              write.finish()
             } finally {
               write.close()
             }
@@ -117,7 +117,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
             val write = newWrite()
             try {
               iter.foreach(row => write.write(row, assigner.apply(row)))
-              write.finish().asScala
+              write.finish()
             } finally {
               write.close()
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 06a255ebb..658d43848 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -18,6 +18,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.executor.OutputMetrics
 import org.apache.spark.rdd.InputFileBlockHolder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -78,4 +79,12 @@ object PaimonUtils {
   def unsetInputFileName(): Unit = {
     InputFileBlockHolder.unset()
   }
+
+  def updateOutputMetrics(
+      outputMetrics: OutputMetrics,
+      bytesWritten: Long,
+      recordsWritten: Long): Unit = {
+    outputMetrics.setBytesWritten(bytesWritten)
+    outputMetrics.setRecordsWritten(recordsWritten)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index f223dabdd..8866428f8 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.sql
 import org.apache.paimon.spark.PaimonMetrics.{RESULTED_TABLE_FILES, 
SKIPPED_TABLE_FILES}
 import org.apache.paimon.spark.PaimonSparkTestBase
 
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.junit.jupiter.api.Assertions
 
@@ -60,6 +61,31 @@ class PaimonMetricTest extends PaimonSparkTestBase {
     }
   }
 
+  test("Paimon Metric: report output metric") {
+    sql(s"CREATE TABLE T (id int)")
+
+    var recordsWritten = 0L
+    var bytesWritten = 0L
+
+    val listener = new SparkListener() {
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+        val outputMetrics = taskEnd.taskMetrics.outputMetrics
+        recordsWritten += outputMetrics.recordsWritten
+        bytesWritten += outputMetrics.bytesWritten
+      }
+    }
+
+    try {
+      spark.sparkContext.addSparkListener(listener)
+      sql(s"INSERT INTO T VALUES 1, 2, 3")
+    } finally {
+      spark.sparkContext.removeSparkListener(listener)
+    }
+
+    Assertions.assertEquals(3, recordsWritten)
+    Assertions.assertTrue(bytesWritten > 0)
+  }
+
   def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
     metrics.find(_.name() == name).get.value()
   }

Reply via email to