This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7220d04  Fix truncation detectability for SQL array, object formats. 
(#11685)
7220d04 is described below

commit 7220d0466bc31dda4b5396742ad08e18f9e01250
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Sep 14 15:59:05 2021 -0700

    Fix truncation detectability for SQL array, object formats. (#11685)
    
    The SQL "array" and "object" formats are intended to return invalid JSON
    (lacking a ] terminator) if an error occurs midstream. This enables callers
    to detect truncated responses. But JsonGenerators, by default, close JSON
    arrays even when not explicitly told to.
    
    This patch disables automatic array closing, which fixes the problem with
    truncated response detection. It also adds tests for truncated responses
    for all result formats.
---
 .../org/apache/druid/sql/http/ArrayWriter.java     |   3 +
 .../org/apache/druid/sql/http/ObjectWriter.java    |   3 +
 .../org/apache/druid/sql/http/SqlResourceTest.java | 113 +++++++++++++++++++--
 3 files changed, 113 insertions(+), 6 deletions(-)

diff --git a/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java 
b/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java
index c177cf3..678df49 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java
@@ -36,6 +36,9 @@ public class ArrayWriter implements ResultFormat.Writer
   {
     this.jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream);
     this.outputStream = outputStream;
+
+    // Disable automatic JSON termination, so clients can detect truncated 
responses.
+    jsonGenerator.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, 
false);
   }
 
   @Override
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java 
b/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java
index b1623a5..ac7b0cf 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java
@@ -36,6 +36,9 @@ public class ObjectWriter implements ResultFormat.Writer
   {
     this.jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream);
     this.outputStream = outputStream;
+
+    // Disable automatic JSON termination, so clients can detect truncated 
responses.
+    jsonGenerator.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, 
false);
   }
 
   @Override
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java 
b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index 02f596a..e1d68ef 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -95,9 +95,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -123,6 +125,7 @@ public class SqlResourceTest extends CalciteTestBase
   private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
validateAndAuthorizeLatchSupplier = new SettableSupplier<>();
   private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
planLatchSupplier = new SettableSupplier<>();
   private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
executeLatchSupplier = new SettableSupplier<>();
+  private final SettableSupplier<Function<Sequence<Object[]>, 
Sequence<Object[]>>> sequenceMapFnSupplier = new SettableSupplier<>();
 
   private boolean sleep = false;
 
@@ -254,7 +257,8 @@ public class SqlResourceTest extends CalciteTestBase
                 System.nanoTime(),
                 validateAndAuthorizeLatchSupplier,
                 planLatchSupplier,
-                executeLatchSupplier
+                executeLatchSupplier,
+                sequenceMapFnSupplier
             );
           }
         },
@@ -506,6 +510,76 @@ public class SqlResourceTest extends CalciteTestBase
   }
 
   @Test
+  public void testArrayResultFormatWithErrorAfterFirstRow() throws Exception
+  {
+    sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
+
+    final String query = "SELECT cnt FROM foo";
+    final Pair<QueryException, String> response =
+        doPostRaw(new SqlQuery(query, ResultFormat.ARRAY, false, null, null), 
req);
+
+    // Truncated response: missing final ]
+    Assert.assertNull(response.lhs);
+    Assert.assertEquals("[[1],[1]", response.rhs);
+  }
+
+  @Test
+  public void testObjectResultFormatWithErrorAfterFirstRow() throws Exception
+  {
+    sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
+
+    final String query = "SELECT cnt FROM foo";
+    final Pair<QueryException, String> response =
+        doPostRaw(new SqlQuery(query, ResultFormat.OBJECT, false, null, null), 
req);
+
+    // Truncated response: missing final ]
+    Assert.assertNull(response.lhs);
+    Assert.assertEquals("[{\"cnt\":1},{\"cnt\":1}", response.rhs);
+  }
+
+  @Test
+  public void testArrayLinesResultFormatWithErrorAfterFirstRow() throws 
Exception
+  {
+    sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
+
+    final String query = "SELECT cnt FROM foo";
+    final Pair<QueryException, String> response =
+        doPostRaw(new SqlQuery(query, ResultFormat.ARRAYLINES, false, null, 
null), req);
+
+    // Truncated response: missing final LFLF
+    Assert.assertNull(response.lhs);
+    Assert.assertEquals("[1]\n[1]", response.rhs);
+  }
+
+  @Test
+  public void testObjectLinesResultFormatWithErrorAfterFirstRow() throws 
Exception
+  {
+    sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
+
+    final String query = "SELECT cnt FROM foo";
+    final Pair<QueryException, String> response =
+        doPostRaw(new SqlQuery(query, ResultFormat.OBJECTLINES, false, null, 
null), req);
+
+    // Truncated response: missing final LFLF
+    Assert.assertNull(response.lhs);
+    Assert.assertEquals("{\"cnt\":1}\n{\"cnt\":1}", response.rhs);
+  }
+
+  @Test
+  public void testCsvResultFormatWithErrorAfterFirstRow() throws Exception
+  {
+    sequenceMapFnSupplier.set(errorAfterSecondRowMapFn());
+
+    final String query = "SELECT cnt FROM foo";
+    final Pair<QueryException, String> response =
+        doPostRaw(new SqlQuery(query, ResultFormat.CSV, false, null, null), 
req);
+
+    // Truncated response: missing final LFLF
+    Assert.assertNull(response.lhs);
+    Assert.assertEquals("1\n1\n", response.rhs);
+  }
+
+  @Test
   public void testArrayResultFormatWithHeader() throws Exception
   {
     final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo 
LIMIT 2";
@@ -1128,7 +1202,14 @@ public class SqlResourceTest extends CalciteTestBase
     if (response.getStatus() == 200) {
       final StreamingOutput output = (StreamingOutput) response.getEntity();
       final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      output.write(baos);
+      try {
+        output.write(baos);
+      }
+      catch (Exception ignored) {
+        // Suppress errors and return the response so far. Similar to what the 
real web server would do, if it
+        // started writing a 200 OK and then threw an exception in the middle.
+      }
+
       return Pair.of(
           null,
           new String(baos.toByteArray(), StandardCharsets.UTF_8)
@@ -1180,11 +1261,26 @@ public class SqlResourceTest extends CalciteTestBase
     return req;
   }
 
+  private static Function<Sequence<Object[]>, Sequence<Object[]>> 
errorAfterSecondRowMapFn()
+  {
+    return results -> {
+      final AtomicLong rows = new AtomicLong();
+      return results.map(row -> {
+        if (rows.incrementAndGet() == 3) {
+          throw new ISE("Oh no!");
+        } else {
+          return row;
+        }
+      });
+    };
+  }
+
   private static class TestSqlLifecycle extends SqlLifecycle
   {
     private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
validateAndAuthorizeLatchSupplier;
     private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
planLatchSupplier;
     private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
executeLatchSupplier;
+    private final SettableSupplier<Function<Sequence<Object[]>, 
Sequence<Object[]>>> sequenceMapFnSupplier;
 
     private TestSqlLifecycle(
         PlannerFactory plannerFactory,
@@ -1195,13 +1291,15 @@ public class SqlResourceTest extends CalciteTestBase
         long startNs,
         SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
validateAndAuthorizeLatchSupplier,
         SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
planLatchSupplier,
-        SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
executeLatchSupplier
+        SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
executeLatchSupplier,
+        SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> 
sequenceMapFnSupplier
     )
     {
       super(plannerFactory, emitter, requestLogger, queryScheduler, startMs, 
startNs);
       this.validateAndAuthorizeLatchSupplier = 
validateAndAuthorizeLatchSupplier;
       this.planLatchSupplier = planLatchSupplier;
       this.executeLatchSupplier = executeLatchSupplier;
+      this.sequenceMapFnSupplier = sequenceMapFnSupplier;
     }
 
     @Override
@@ -1253,9 +1351,12 @@ public class SqlResourceTest extends CalciteTestBase
     @Override
     public Sequence<Object[]> execute()
     {
+      final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
+          
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
+
       if (executeLatchSupplier.get() != null) {
         if (executeLatchSupplier.get().rhs) {
-          Sequence<Object[]> sequence = super.execute();
+          Sequence<Object[]> sequence = sequenceMapFn.apply(super.execute());
           executeLatchSupplier.get().lhs.countDown();
           return sequence;
         } else {
@@ -1267,10 +1368,10 @@ public class SqlResourceTest extends CalciteTestBase
           catch (InterruptedException e) {
             throw new RuntimeException(e);
           }
-          return super.execute();
+          return sequenceMapFn.apply(super.execute());
         }
       } else {
-        return super.execute();
+        return sequenceMapFn.apply(super.execute());
       }
     }
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to