This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 44522756382d [SPARK-45794][SS] Introduce state metadata source to 
query the streaming state metadata information
44522756382d is described below

commit 44522756382dd628c2278d22df4cf7db4461079d
Author: Chaoqin Li <chaoqin...@databricks.com>
AuthorDate: Wed Nov 8 14:04:39 2023 +0900

    [SPARK-45794][SS] Introduce state metadata source to query the streaming 
state metadata information
    
    ### What changes were proposed in this pull request?
    
    Introduce a new data source so that user can query the metadata of each 
state store of a streaming query, the schema of the result will be following:
    
    column | type
    --------- | -------
    | operatorId | INT |
    | operatorName | STRING |
    | stateStoreName | STRING |
    | numPartitions | INT |
    | minBatchId | LONG |
    | minBatchId | LONG |
    | _numColsPrefixKey (metadata column) | INT |
    
    To use this source, specify the source format and checkpoint path and load 
the dataframe
    
    `df = spark.read.format(“state-metadata”).load(“/checkpointPath”)`
    
    ### Why are the changes needed?
    
    To improve debugability. Also facilitate the query of state store data 
source introduced in SPARK-45511 by displaying the operator id, batch id and 
state store name.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this is a new source exposed to user.
    
    ### How was this patch tested?
    
    Add test to verify the output of state metadata
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43660 from chaoqin-li1123/state_metadata_source.
    
    Authored-by: Chaoqin Li <chaoqin...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 ...org.apache.spark.sql.sources.DataSourceRegister |   3 +-
 .../v2/state/metadata/StateMetadataSource.scala    | 214 +++++++++++++++++++++
 .../state/OperatorStateMetadataSuite.scala         |  37 +++-
 3 files changed, 252 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 3169e75031fc..b4c18c38f04a 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -27,4 +27,5 @@ org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
 org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat
-org.apache.spark.sql.execution.streaming.sources.RatePerMicroBatchProvider
\ No newline at end of file
+org.apache.spark.sql.execution.streaming.sources.RatePerMicroBatchProvider
+org.apache.spark.sql.execution.datasources.v2.state.StateMetadataSource
\ No newline at end of file
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
new file mode 100644
index 000000000000..8a74db8d1963
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, Table, TableCapability, TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadata, 
OperatorStateMetadataReader, OperatorStateMetadataV1}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.{DataType, IntegerType, LongType, 
StringType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.SerializableConfiguration
+
+case class StateMetadataTableEntry(
+    operatorId: Long,
+    operatorName: String,
+    stateStoreName: String,
+    numPartitions: Int,
+    minBatchId: Long,
+    maxBatchId: Long,
+    numColsPrefixKey: Int) {
+  def toRow(): InternalRow = {
+    InternalRow.fromSeq(
+      Seq(operatorId,
+        UTF8String.fromString(operatorName),
+        UTF8String.fromString(stateStoreName),
+        numPartitions,
+        minBatchId,
+        maxBatchId,
+        numColsPrefixKey))
+  }
+}
+
+object StateMetadataTableEntry {
+  private[sql] val schema = {
+    new StructType()
+      .add("operatorId", LongType)
+      .add("operatorName", StringType)
+      .add("stateStoreName", StringType)
+      .add("numPartitions", IntegerType)
+      .add("minBatchId", LongType)
+      .add("maxBatchId", LongType)
+  }
+}
+
+class StateMetadataSource extends TableProvider with DataSourceRegister {
+  override def shortName(): String = "state-metadata"
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    new StateMetadataTable
+  }
+
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    // The schema of state metadata table is static.
+   StateMetadataTableEntry.schema
+  }
+}
+
+
+class StateMetadataTable extends Table with SupportsRead with 
SupportsMetadataColumns {
+  override def name(): String = "state-metadata-table"
+
+  override def schema(): StructType = StateMetadataTableEntry.schema
+
+  override def capabilities(): util.Set[TableCapability] = 
Set(TableCapability.BATCH_READ).asJava
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    () => {
+      if (!options.containsKey("path")) {
+        throw new IllegalArgumentException("Checkpoint path is not specified 
for" +
+          " state metadata data source.")
+      }
+      new StateMetadataScan(options.get("path"))
+    }
+  }
+
+  private object NumColsPrefixKeyColumn extends MetadataColumn {
+    override def name: String = "_numColsPrefixKey"
+    override def dataType: DataType = IntegerType
+    override def comment: String = "Number of columns in prefix key of the 
state store instance"
+  }
+
+  override val metadataColumns: Array[MetadataColumn] = 
Array(NumColsPrefixKeyColumn)
+}
+
+case class StateMetadataInputPartition(checkpointLocation: String) extends 
InputPartition
+
+class StateMetadataScan(checkpointLocation: String) extends Scan {
+  override def readSchema: StructType = StateMetadataTableEntry.schema
+
+  override def toBatch: Batch = {
+    new Batch {
+      override def planInputPartitions(): Array[InputPartition] = {
+        Array(StateMetadataInputPartition(checkpointLocation))
+      }
+
+      override def createReaderFactory(): PartitionReaderFactory = {
+        // Don't need to broadcast the hadoop conf because this source only 
has one partition.
+        val conf = new 
SerializableConfiguration(SparkSession.active.sessionState.newHadoopConf())
+        StateMetadataPartitionReaderFactory(conf)
+      }
+    }
+  }
+}
+
+case class StateMetadataPartitionReaderFactory(hadoopConf: 
SerializableConfiguration)
+  extends PartitionReaderFactory {
+
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    new StateMetadataPartitionReader(
+      partition.asInstanceOf[StateMetadataInputPartition].checkpointLocation, 
hadoopConf)
+  }
+}
+
+class StateMetadataPartitionReader(
+    checkpointLocation: String,
+    serializedHadoopConf: SerializableConfiguration) extends 
PartitionReader[InternalRow] {
+
+  override def next(): Boolean = {
+    stateMetadata.hasNext
+  }
+
+  override def get(): InternalRow = {
+    stateMetadata.next().toRow()
+  }
+
+  override def close(): Unit = {}
+
+  private def pathToLong(path: Path) = {
+    path.getName.toLong
+  }
+
+  private def pathNameCanBeParsedAsLong(path: Path) = {
+    try {
+      pathToLong(path)
+      true
+    } catch {
+      case _: NumberFormatException => false
+    }
+  }
+
+  // Return true when the filename can be parsed as long integer.
+  private val pathNameCanBeParsedAsLongFilter = new PathFilter {
+    override def accept(path: Path): Boolean = pathNameCanBeParsedAsLong(path)
+  }
+
+  private lazy val hadoopConf: Configuration = serializedHadoopConf.value
+
+  private lazy val fileManager =
+    CheckpointFileManager.create(new Path(checkpointLocation), hadoopConf)
+
+  // List the commit log entries to find all the available batch ids.
+  private def batchIds: Array[Long] = {
+    val commitLog = new Path(checkpointLocation, "commits")
+    if (fileManager.exists(commitLog)) {
+      fileManager
+        .list(commitLog, pathNameCanBeParsedAsLongFilter).map(f => 
pathToLong(f.getPath)).sorted
+    } else Array.empty
+  }
+
+  private def allOperatorStateMetadata: Array[OperatorStateMetadata] = {
+    val stateDir = new Path(checkpointLocation, "state")
+    val opIds = fileManager
+      .list(stateDir, pathNameCanBeParsedAsLongFilter).map(f => 
pathToLong(f.getPath)).sorted
+    opIds.map { opId =>
+      new OperatorStateMetadataReader(new Path(stateDir, opId.toString), 
hadoopConf).read()
+    }
+  }
+
+  private lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
+    allOperatorStateMetadata.flatMap { operatorStateMetadata =>
+      require(operatorStateMetadata.version == 1)
+      val operatorStateMetadataV1 = 
operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1]
+      operatorStateMetadataV1.stateStoreInfo.map { stateStoreMetadata =>
+        
StateMetadataTableEntry(operatorStateMetadataV1.operatorInfo.operatorId,
+          operatorStateMetadataV1.operatorInfo.operatorName,
+          stateStoreMetadata.storeName,
+          stateStoreMetadata.numPartitions,
+          if (batchIds.nonEmpty) batchIds.head else -1,
+          if (batchIds.nonEmpty) batchIds.last else -1,
+          stateStoreMetadata.numColsPrefixKey
+        )
+      }
+    }
+  }.iterator
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
index 48cc17bbbabf..340187fa4951 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming.state
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.sql.Column
+import org.apache.spark.sql.{Column, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
@@ -53,6 +53,15 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
       val operatorMetadata = OperatorStateMetadataV1(operatorInfo, 
stateStoreInfo.toArray)
       new OperatorStateMetadataWriter(statePath, 
hadoopConf).write(operatorMetadata)
       checkOperatorStateMetadata(checkpointDir.toString, 0, operatorMetadata)
+      val df = spark.read.format("state-metadata").load(checkpointDir.toString)
+      // Commit log is empty, there is no available batch id.
+      checkAnswer(df, Seq(Row(1, "Join", "store1", 200, -1L, -1L),
+          Row(1, "Join", "store2", 200, -1L, -1L),
+          Row(1, "Join", "store3", 200, -1L, -1L),
+          Row(1, "Join", "store4", 200, -1L, -1L)
+        ))
+      checkAnswer(df.select(df.metadataColumn("_numColsPrefixKey")),
+        Seq(Row(1), Row(1), Row(1), Row(1)))
     }
   }
 
@@ -105,6 +114,16 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
       val expectedMetadata = OperatorStateMetadataV1(
         OperatorInfoV1(0, "symmetricHashJoin"), expectedStateStoreInfo)
       checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata)
+
+      val df = spark.read.format("state-metadata")
+        .load(checkpointDir.toString)
+      checkAnswer(df, Seq(Row(0, "symmetricHashJoin", "left-keyToNumValues", 
5, 0L, 1L),
+          Row(0, "symmetricHashJoin", "left-keyWithIndexToValue", 5, 0L, 1L),
+          Row(0, "symmetricHashJoin", "right-keyToNumValues", 5, 0L, 1L),
+          Row(0, "symmetricHashJoin", "right-keyWithIndexToValue", 5, 0L, 1L)
+        ))
+      checkAnswer(df.select(df.metadataColumn("_numColsPrefixKey")),
+        Seq(Row(0), Row(0), Row(0), Row(0)))
     }
   }
 
@@ -147,6 +166,10 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
         Array(StateStoreMetadataV1("default", 1, 
spark.sessionState.conf.numShufflePartitions))
       )
       checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata)
+
+      val df = spark.read.format("state-metadata").load(checkpointDir.toString)
+      checkAnswer(df, Seq(Row(0, "sessionWindowStateStoreSaveExec", "default", 
5, 0L, 0L)))
+      checkAnswer(df.select(df.metadataColumn("_numColsPrefixKey")), 
Seq(Row(1)))
     }
   }
 
@@ -176,6 +199,18 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
         Array(StateStoreMetadataV1("default", 0, numShufflePartitions)))
       checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata0)
       checkOperatorStateMetadata(checkpointDir.toString, 1, expectedMetadata1)
+
+      val df = spark.read.format("state-metadata").load(checkpointDir.toString)
+      checkAnswer(df, Seq(Row(0, "stateStoreSave", "default", 5, 0L, 1L),
+          Row(1, "stateStoreSave", "default", 5, 0L, 1L)))
+      checkAnswer(df.select(df.metadataColumn("_numColsPrefixKey")), 
Seq(Row(0), Row(0)))
+    }
+  }
+
+  test("State metadata data source handle missing argument") {
+    val e = intercept[IllegalArgumentException] {
+      spark.read.format("state-metadata").load().collect()
     }
+    assert(e.getMessage == "Checkpoint path is not specified for state 
metadata data source.")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to