This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ab8072375 [client] Improve: add detailed context to exception messages 
(#1806)
ab8072375 is described below

commit ab8072375ece990105880ad3a0504a8433bd60e2
Author: CodeDrinks <[email protected]>
AuthorDate: Tue Oct 14 16:42:03 2025 +0530

    [client] Improve: add detailed context to exception messages (#1806)
---
 .../fluss/client/table/scanner/TableScan.java      | 25 +++++++--
 .../apache/fluss/client/write/WriterClient.java    | 59 +++++++++++++++-------
 2 files changed, 60 insertions(+), 24 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
index 1cfe3aa5f..7fade0547 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
@@ -74,7 +74,11 @@ public class TableScan implements Scan {
             int index = rowType.getFieldIndex(projectedColumnNames.get(i));
             if (index < 0) {
                 throw new IllegalArgumentException(
-                        "Field " + projectedColumnNames.get(i) + " not found 
in table schema.");
+                        String.format(
+                                "Field '%s' not found in table schema. 
Available fields: %s, Table: %s",
+                                projectedColumnNames.get(i),
+                                rowType.getFieldNames(),
+                                tableInfo.getTablePath()));
             }
             columnIndexes[i] = index;
         }
@@ -89,7 +93,10 @@ public class TableScan implements Scan {
     @Override
     public LogScanner createLogScanner() {
         if (limit != null) {
-            throw new UnsupportedOperationException("LogScanner doesn't 
support limit pushdown.");
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "LogScanner doesn't support limit pushdown. Table: 
%s, requested limit: %d",
+                            tableInfo.getTablePath(), limit));
         }
         return new LogScannerImpl(
                 conn.getConfiguration(),
@@ -104,7 +111,9 @@ public class TableScan implements Scan {
     public BatchScanner createBatchScanner(TableBucket tableBucket) {
         if (limit == null) {
             throw new UnsupportedOperationException(
-                    "Currently, BatchScanner is only available when limit is 
set.");
+                    String.format(
+                            "Currently, BatchScanner is only available when 
limit is set. Table: %s, bucket: %s",
+                            tableInfo.getTablePath(), tableBucket));
         }
         return new LimitBatchScanner(
                 tableInfo, tableBucket, conn.getMetadataUpdater(), 
projectedColumns, limit);
@@ -114,7 +123,9 @@ public class TableScan implements Scan {
     public BatchScanner createBatchScanner(TableBucket tableBucket, long 
snapshotId) {
         if (limit != null) {
             throw new UnsupportedOperationException(
-                    "Currently, SnapshotBatchScanner doesn't support limit 
pushdown.");
+                    String.format(
+                            "Currently, SnapshotBatchScanner doesn't support 
limit pushdown. Table: %s, bucket: %s, snapshot ID: %d, requested limit: %d",
+                            tableInfo.getTablePath(), tableBucket, snapshotId, 
limit));
         }
         String scannerTmpDir =
                 
conn.getConfiguration().getString(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR);
@@ -123,7 +134,11 @@ public class TableScan implements Scan {
         try {
             snapshotMeta = admin.getKvSnapshotMetadata(tableBucket, 
snapshotId).get();
         } catch (Exception e) {
-            throw new FlussRuntimeException("Failed to get snapshot metadata", 
e);
+            throw new FlussRuntimeException(
+                    String.format(
+                            "Failed to get snapshot metadata for table bucket 
%s, snapshot ID: %d, Table: %s",
+                            tableBucket, snapshotId, tableInfo.getTablePath()),
+                    e);
         }
 
         return new KvSnapshotBatchScanner(
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 0af0b2859..e91c04843 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -92,12 +92,16 @@ public class WriterClient {
             MetadataUpdater metadataUpdater,
             ClientMetricGroup clientMetricGroup,
             Admin admin) {
+        int maxRequestSizeLocal = -1;
+        IdempotenceManager idempotenceManagerLocal = null;
         try {
             this.conf = conf;
             this.metadataUpdater = metadataUpdater;
-            this.maxRequestSize =
+            maxRequestSizeLocal =
                     (int) 
conf.get(ConfigOptions.CLIENT_WRITER_REQUEST_MAX_SIZE).getBytes();
-            this.idempotenceManager = buildIdempotenceManager();
+            this.maxRequestSize = maxRequestSizeLocal;
+            idempotenceManagerLocal = buildIdempotenceManager();
+            this.idempotenceManager = idempotenceManagerLocal;
             this.writerMetricGroup = new WriterMetricGroup(clientMetricGroup);
 
             short acks = 
configureAcks(idempotenceManager.idempotenceEnabled());
@@ -117,7 +121,14 @@ public class WriterClient {
                             this::maybeAbortBatches);
         } catch (Throwable t) {
             close(Duration.ofMillis(0));
-            throw new FlussRuntimeException("Failed to construct writer", t);
+            throw new FlussRuntimeException(
+                    String.format(
+                            "Failed to construct writer. Max request size: %d 
bytes, Idempotence enabled: %b",
+                            maxRequestSizeLocal,
+                            idempotenceManagerLocal != null
+                                    ? 
idempotenceManagerLocal.idempotenceEnabled()
+                                    : false),
+                    t);
         }
     }
 
@@ -148,7 +159,11 @@ public class WriterClient {
         try {
             accumulator.awaitFlushCompletion();
         } catch (InterruptedException e) {
-            throw new FlussRuntimeException("Flush interrupted." + e);
+            throw new FlussRuntimeException(
+                    String.format(
+                            "Flush interrupted after %d ms. Writer may be in 
inconsistent state",
+                            System.currentTimeMillis() - start),
+                    e);
         }
         LOG.trace(
                 "Flushed accumulated records in writer in {} ms.",
@@ -196,7 +211,12 @@ public class WriterClient {
                 // TODO add the wakeup logic refer to Kafka.
             }
         } catch (Exception e) {
-            throw new FlussRuntimeException(e);
+            throw new FlussRuntimeException(
+                    String.format(
+                            "Failed to send record to table %s. Writer state: 
%s",
+                            record.getPhysicalTablePath(),
+                            sender != null && sender.isRunning() ? "running" : 
"closed"),
+                    e);
         }
     }
 
@@ -212,7 +232,10 @@ public class WriterClient {
     private void throwIfWriterClosed() {
         if (sender == null || !sender.isRunning()) {
             throw new IllegalStateException(
-                    "Cannot perform operation after writer has been closed");
+                    String.format(
+                            "Cannot perform write operation after writer has 
been closed. Sender running: %b, Thread pool shutdown: %b",
+                            sender != null && sender.isRunning(),
+                            ioThreadPool == null || 
ioThreadPool.isShutdown()));
         }
     }
 
@@ -225,11 +248,11 @@ public class WriterClient {
                 && maxInflightRequestPerBucket
                         > MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE) {
             throw new IllegalConfigurationException(
-                    "The value of "
-                            + 
ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET.key()
-                            + " should be less than or equal to "
-                            + MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
-                            + " when idempotence writer enabled to ensure 
message ordering.");
+                    String.format(
+                            "Invalid configuration for idempotent writer. The 
value of %s (%d) should be less than or equal to %d when idempotence is enabled 
to ensure message ordering",
+                            
ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET.key(),
+                            maxInflightRequestPerBucket,
+                            
MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE));
         }
 
         TabletServerGateway tabletServerGateway = 
metadataUpdater.newRandomTabletServerClient();
@@ -249,10 +272,9 @@ public class WriterClient {
 
         if (idempotenceEnabled && ack != -1) {
             throw new IllegalConfigurationException(
-                    "Must set "
-                            + ConfigOptions.CLIENT_WRITER_ACKS.key()
-                            + " to 'all' in order to use the idempotent 
writer. Otherwise "
-                            + "we cannot guarantee idempotence.");
+                    String.format(
+                            "Invalid acks configuration for idempotent writer. 
Must set %s to 'all' (current value: '%s') in order to use the idempotent 
writer. Otherwise we cannot guarantee idempotence",
+                            ConfigOptions.CLIENT_WRITER_ACKS.key(), acks));
         }
 
         return ack;
@@ -262,10 +284,9 @@ public class WriterClient {
         int retries = conf.getInt(ConfigOptions.CLIENT_WRITER_RETRIES);
         if (idempotenceEnabled && retries == 0) {
             throw new IllegalConfigurationException(
-                    "Must set "
-                            + ConfigOptions.CLIENT_WRITER_RETRIES.key()
-                            + " to non-zero when using the idempotent writer. 
Otherwise "
-                            + "we cannot guarantee idempotence.");
+                    String.format(
+                            "Invalid retries configuration for idempotent 
writer. Must set %s to non-zero (current value: %d) when using the idempotent 
writer. Otherwise we cannot guarantee idempotence",
+                            ConfigOptions.CLIENT_WRITER_RETRIES.key(), 
retries));
         }
         return retries;
     }

Reply via email to