This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 81d715d555355d7af928c24e164a626f75713afe Author: Grant Henke <[email protected]> AuthorDate: Mon May 6 08:52:15 2019 -0500 KUDU-2812: Fix error reporting in KuduRestore Before this patch, calls to `session.getPendingErrors.getRowErrors.length` would clear the errors buffer before we called it again to report the errors. This patch ensures we capture the result of `session.getPendingErrors` before checking the length. Note: This patch also updates the message format for both kudu-backup and kudu-spark. Change-Id: Iadf3941614a4879a9f35d1df9ee0cea274711c94 Reviewed-on: http://gerrit.cloudera.org:8080/13244 Reviewed-by: Will Berkeley <[email protected]> Tested-by: Kudu Jenkins --- .../scala/org/apache/kudu/backup/KuduRestore.scala | 20 ++++++++++++++------ .../org/apache/kudu/spark/kudu/KuduContext.scala | 17 +++++++++++------ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala index 77bddeb..3b6a537 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala @@ -131,12 +131,20 @@ object KuduRestore { session.close() } // Fail the task if there are any errors. - val errorCount = session.getPendingErrors.getRowErrors.length - if (errorCount > 0) { - val errors = - session.getPendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString - throw new RuntimeException( - s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors") + // It is important to capture all of the errors via getRowErrors and then check + // the length because each call to session.getPendingErrors clears the ErrorCollector. + val pendingErrors = session.getPendingErrors + if (pendingErrors.getRowErrors.nonEmpty) { + val errors = pendingErrors.getRowErrors + val sample = errors.take(5).map(_.getErrorStatus).mkString + if (pendingErrors.isOverflowed) { + throw new RuntimeException( + s"PendingErrors overflowed. Failed to write at least ${errors.length} rows " + + s"to Kudu; Sample errors: $sample") + } else { + throw new RuntimeException( + s"Failed to write ${errors.length} rows to Kudu; Sample errors: $sample") + } } } } diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index 28aec32..a27a1e7 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -355,12 +355,17 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou operation, lastPropagatedTimestamp, writeOptions) - val errorCount = pendingErrors.getRowErrors.length - if (errorCount > 0) { - val errors = - pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString - throw new RuntimeException( - s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors") + if (pendingErrors.getRowErrors.nonEmpty) { + val errors = pendingErrors.getRowErrors + val sample = errors.take(5).map(_.getErrorStatus).mkString + if (pendingErrors.isOverflowed) { + throw new RuntimeException( + s"PendingErrors overflowed. Failed to write at least ${errors.length} rows " + + s"to Kudu; Sample errors: $sample") + } else { + throw new RuntimeException( + s"Failed to write ${errors.length} rows to Kudu; Sample errors: $sample") + } } }) log.info(s"completed $operation ops: duration histogram: $durationHistogram")
