This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d9bfb675f3 [SPARK-38787][SS] Replace found value with non-null 
element in the remaining list for key and remove remaining null elements from 
values in keyWithIndexToValue store for stream-stream joins
6d9bfb675f3 is described below

commit 6d9bfb675f3e58c6e7d9facd8cf3f22069c4cc48
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Thu Apr 7 05:51:57 2022 +0900

    [SPARK-38787][SS] Replace found value with non-null element in the 
remaining list for key and remove remaining null elements from values in 
keyWithIndexToValue store for stream-stream joins
    
    ### What changes were proposed in this pull request?
    
    In stream-stream joins, for removing old state (watermark by value), we 
call the `removeByValue` function with a removal condition. Within the iterator 
returned, if we find null at the end for matched value at non-last index, we 
are currently not removing and swapping the matched value. With this change, we 
will find the first non-null value from end and swap current index with that 
value and remove all elements from index + 1 to the end and then drop the last 
element as before.
    
    ### Why are the changes needed?
    
    This change fixes a bug where we were not replacing found/matching values 
for `removeByValue` when encountering nulls in the symmetric hash join code.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a unit test for this change with nulls added. Here is a sample output:
    ```
    Executing tests from 
//sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2
    
-----------------------------------------------------------------------------
    2022-04-01 21:11:59,641 INFO  CodeGenerator - Code generated in 225.884757 
ms
    2022-04-01 21:11:59,662 INFO  CodeGenerator - Code generated in 10.870786 ms
    Run starting. Expected test count is: 4
    …
    ===== TEST OUTPUT FOR 
o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 
'StreamingJoinStateManager V2 - all operations with nulls' =====
    
    2022-04-01 21:12:03,487 INFO  StateStore - State Store maintenance task 
started
    2022-04-01 21:12:03,508 INFO  CheckpointFileManager - Writing atomically to 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema
 using temp file 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp
    2022-04-01 21:12:03,524 INFO  CheckpointFileManager - Renamed temp file 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp
 to 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema
    2022-04-01 21:12:03,525 INFO  StateStore - Retrieved reference to 
StateStoreCoordinator: 
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9
    2022-04-01 21:12:03,525 INFO  StateStore - Reported that the loaded 
instance 
StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50)
 is active
    2022-04-01 21:12:03,525 INFO  HDFSBackedStateStoreProvider - Retrieved 
version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for 
update
    2022-04-01 21:12:03,525 INFO  
SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store 
StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
    2022-04-01 21:12:03,541 INFO  CheckpointFileManager - Writing atomically to 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema
 using temp file 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp
    2022-04-01 21:12:03,556 INFO  CheckpointFileManager - Renamed temp file 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp
 to 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema
    2022-04-01 21:12:03,558 INFO  StateStore - Retrieved reference to 
StateStoreCoordinator: 
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb
    2022-04-01 21:12:03,559 INFO  StateStore - Reported that the loaded 
instance 
StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50)
 is active
    2022-04-01 21:12:03,559 INFO  HDFSBackedStateStoreProvider - Retrieved 
version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] 
for update
    2022-04-01 21:12:03,559 INFO  
SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store 
StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
    2022-04-01 21:12:03,564 INFO  CheckpointFileManager - Writing atomically to 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta
 using temp file 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp
    2022-04-01 21:12:03,568 INFO  CheckpointFileManager - Writing atomically to 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta 
using temp file 
/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp
    2022-04-01 21:12:03,572 WARN  SymmetricHashJoinStateManager - 
`keyWithIndexToValue` returns a null value for index 4 at current key 
[false,40,10.0].
    2022-04-01 21:12:03,574 WARN  SymmetricHashJoinStateManager - 
`keyWithIndexToValue` returns a null value for index 3 at current key 
[false,40,10.0].
    2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - 
`keyWithIndexToValue` returns a null value for index 3 at current key 
[false,60,10.0].
    2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - 
`keyWithIndexToValue` returns a null value for index 1 at current key 
[false,40,10.0].
    2022-04-01 21:12:03,577 INFO  
SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store 
StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
    2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted 
version 1 for 
HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues]
    2022-04-01 21:12:03,577 INFO  
SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store 
StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
    2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted 
version 1 for 
HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue]
    2022-04-01 21:12:03,580 INFO  StateStore - StateStore stopped
    2022-04-01 21:12:03,580 INFO  SymmetricHashJoinStateManagerSuite -
    
    ===== FINISHED 
o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 
'StreamingJoinStateManager V2 - all operations with nulls' =====
    …
    2022-04-01 21:12:04,205 INFO  StateStore - StateStore stopped
    Run completed in 5 seconds, 908 milliseconds.
    Total number of tests run: 4
    Suites: completed 1, aborted 0
    Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
    All tests passed.
    2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Shutdown hook called
    2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Deleting directory 
/tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c
    2022-04-01 21:12:04,608 INFO  ShutdownHookManager - Deleting directory 
/tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516
    ```
    
    Closes #36073 from anishshri-db/bfix/SPARK-38787.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../state/SymmetricHashJoinStateManager.scala      | 46 +++++++++++++++--
 .../state/SymmetricHashJoinStateManagerSuite.scala | 60 +++++++++++++++++++---
 2 files changed, 97 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index f301d233cb0..56c47d564a3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -256,6 +256,16 @@ class SymmetricHashJoinStateManager(
         return null
       }
 
+      /**
+       * Find the first non-null value index starting from end
+       * and going up-to stopIndex.
+       */
+      private def getRightMostNonNullIndex(stopIndex: Long): Option[Long] = {
+        (numValues - 1 to stopIndex by -1).find { idx =>
+          keyWithIndexToValue.get(currentKey, idx) != null
+        }
+      }
+
       override def getNext(): KeyToValuePair = {
         val currentValue = findNextValueForIndex()
 
@@ -272,12 +282,33 @@ class SymmetricHashJoinStateManager(
         if (index != numValues - 1) {
           val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, 
numValues - 1)
           if (valuePairAtMaxIndex != null) {
+            // Likely case where last element is non-null and we can simply 
swap with index.
             keyWithIndexToValue.put(currentKey, index, 
valuePairAtMaxIndex.value,
               valuePairAtMaxIndex.matched)
           } else {
-            val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
-            logWarning(s"`keyWithIndexToValue` returns a null value for index 
${numValues - 1} " +
-              s"at current key $projectedKey.")
+            // Find the rightmost non null index and swap values with that 
index,
+            // if index returned is not the same as the passed one
+            val nonNullIndex = getRightMostNonNullIndex(index + 
1).getOrElse(index)
+            if (nonNullIndex != index) {
+              val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
+              keyWithIndexToValue.put(currentKey, index, valuePair.value,
+                valuePair.matched)
+            }
+
+            // If nulls were found at the end, log a warning for the range of 
null indices.
+            if (nonNullIndex != numValues - 1) {
+              logWarning(s"`keyWithIndexToValue` returns a null value for 
indices " +
+                s"with range from startIndex=${nonNullIndex + 1} " +
+                s"and endIndex=${numValues - 1}.")
+            }
+
+            // Remove all null values from nonNullIndex + 1 onwards
+            // The nonNullIndex itself will be handled as removing the last 
entry,
+            // similar to finding the value as the last element
+            (numValues - 1 to nonNullIndex + 1 by -1).foreach { removeIndex =>
+              keyWithIndexToValue.remove(currentKey, removeIndex)
+              numValues -= 1
+            }
           }
         }
         keyWithIndexToValue.remove(currentKey, numValues - 1)
@@ -324,6 +355,15 @@ class SymmetricHashJoinStateManager(
     )
   }
 
+  /**
+   * Update number of values for a key.
+   * NOTE: this function is only intended for use in unit tests
+   * to simulate null values.
+   */
+  private[state] def updateNumValuesTestOnly(key: UnsafeRow, numValues: Long): 
Unit = {
+    keyToNumValues.put(key, numValues)
+  }
+
   /*
   =====================================================
             Private methods and inner classes
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
index 8a03d46d000..deeebe1fc42 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
@@ -46,6 +46,12 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest 
with BeforeAndAfter
     }
   }
 
+  SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
+    test(s"StreamingJoinStateManager V${version} - all operations with nulls") 
{
+      testAllOperationsWithNulls(version)
+    }
+  }
+
   SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
     test(s"SPARK-35689: StreamingJoinStateManager V${version} - " +
         "printable key of keyWithIndexToValue") {
@@ -68,7 +74,6 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest 
with BeforeAndAfter
     }
   }
 
-
   private def testAllOperations(stateFormatVersion: Int): Unit = {
     withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) 
{ manager =>
       implicit val mgr = manager
@@ -99,11 +104,6 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest 
with BeforeAndAfter
       assert(get(30) === Seq.empty)     // should remove 30
       assert(numRows === 0)
 
-      def appendAndTest(key: Int, values: Int*): Unit = {
-        values.foreach { value => append(key, value)}
-        require(get(key) === values)
-      }
-
       appendAndTest(40, 100, 200, 300)
       appendAndTest(50, 125)
       appendAndTest(60, 275)              // prepare for testing removeByValue
@@ -130,6 +130,43 @@ class SymmetricHashJoinStateManagerSuite extends 
StreamTest with BeforeAndAfter
       assert(numRows === 0)
     }
   }
+
+  /* Test removeByValue with nulls simulated by updating numValues on the 
state manager */
+  private def testAllOperationsWithNulls(stateFormatVersion: Int): Unit = {
+    withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) 
{ manager =>
+      implicit val mgr = manager
+
+      appendAndTest(40, 100, 200, 300)
+      appendAndTest(50, 125)
+      appendAndTest(60, 275)              // prepare for testing removeByValue
+      assert(numRows === 5)
+
+      updateNumValues(40, 5)   // update total values to 5 to create 2 nulls
+      removeByValue(125)
+      assert(get(40) === Seq(200, 300))
+      assert(get(50) === Seq.empty)
+      assert(get(60) === Seq(275))        // should remove only some values, 
not all and nulls
+      assert(numRows === 3)
+
+      append(40, 50)
+      assert(get(40) === Seq(50, 200, 300))
+      assert(numRows === 4)
+      updateNumValues(40, 4)   // update total values to 4 to create 1 null
+
+      removeByValue(200)
+      assert(get(40) === Seq(300))
+      assert(get(60) === Seq(275))        // should remove only some values, 
not all and nulls
+      assert(numRows === 2)
+      updateNumValues(40, 2)   // update total values to simulate nulls
+      updateNumValues(60, 4)
+
+      removeByValue(300)
+      assert(get(40) === Seq.empty)
+      assert(get(60) === Seq.empty)       // should remove all values now 
including nulls
+      assert(numRows === 0)
+    }
+  }
+
   val watermarkMetadata = new 
MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build()
   val inputValueSchema = new StructType()
     .add(StructField("time", IntegerType, metadata = watermarkMetadata))
@@ -157,6 +194,17 @@ class SymmetricHashJoinStateManagerSuite extends 
StreamTest with BeforeAndAfter
     manager.append(toJoinKeyRow(key), toInputValue(value), matched = false)
   }
 
+  def appendAndTest(key: Int, values: Int*)
+                   (implicit manager: SymmetricHashJoinStateManager): Unit = {
+    values.foreach { value => append(key, value)}
+    require(get(key) === values)
+  }
+
+  def updateNumValues(key: Int, numValues: Long)
+                     (implicit manager: SymmetricHashJoinStateManager): Unit = 
{
+    manager.updateNumValuesTestOnly(toJoinKeyRow(key), numValues)
+  }
+
   def get(key: Int)(implicit manager: SymmetricHashJoinStateManager): Seq[Int] 
= {
     manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to