steFaiz commented on code in PR #6956:
URL: https://github.com/apache/paimon/pull/6956#discussion_r2663906973


##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala:
##########
@@ -137,4 +140,219 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
       assert(totalRowCount == 189088L)
     }
   }
+
+  test("create btree global index") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true',
+                  |  'btree-index.records-per-range' = '1000')
+                  |""".stripMargin)
+
+      val values =
+        (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql(
+            "CALL sys.create_global_index(table => 'test.T', index_column => 
'name', index_type => 'btree'," +
+              " options => 'btree-index.records-per-range=1000')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+      val table = loadTable("T")
+      val btreeEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "btree")
+        .map(_.indexFile())
+      table.store().newGlobalIndexScanBuilder().shardList()
+      assert(btreeEntries.nonEmpty)
+
+      // 1. assert total row count and file count
+      val totalRowCount = btreeEntries.map(_.rowCount()).sum
+      assert(btreeEntries.size == 100)
+      assert(totalRowCount == 100000L)
+
+      // 2. assert global index meta not null
+      btreeEntries.foreach(e => assert(e.globalIndexMeta() != null))
+
+      // 3. assert btree index file range non-overlapping
+      case class MetaWithKey(meta: BTreeIndexMeta, first: Object, last: Object)
+      val keySerializer = KeySerializer.create(new VarCharType())
+      val comparator = keySerializer.createComparator()
+
+      def deserialize(bytes: Array[Byte]): Object = {
+        keySerializer.deserialize(MemorySlice.wrap(bytes))
+      }
+
+      val btreeMetas = btreeEntries
+        .map(_.globalIndexMeta().indexMeta())
+        .map(meta => BTreeIndexMeta.deserialize(meta))
+        .map(
+          m => {
+            assert(m.getFirstKey != null)
+            assert(m.getLastKey != null)
+            MetaWithKey(m, deserialize(m.getFirstKey), 
deserialize(m.getLastKey))
+          })
+
+      // sort by first key
+      val sorted = btreeMetas.sortWith((m1, m2) => 
comparator.compare(m1.first, m2.first) < 0)
+
+      // should not overlap
+      sorted.sliding(2).foreach {
+        case Seq(prev: MetaWithKey, next: MetaWithKey) =>
+          assert(comparator.compare(prev.last, next.first) <= 0)
+        case _ => // ignore
+      }
+    }
+  }
+
+  test("create btree global index with multiple partitions") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING, pt STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |  PARTITIONED BY (pt)
+                  |""".stripMargin)
+
+      var values =
+        (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql(
+            "CALL sys.create_global_index(table => 'test.T', index_column => 
'name', index_type => 'btree'," +
+              " options => 'btree-index.records-per-range=1000')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      assertMultiplePartitionsResult("T", 189088L, 3)
+    }
+  }
+
+  test("create btree index within one spark partition") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING, pt STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |  PARTITIONED BY (pt)
+                  |""".stripMargin)
+
+      var values =
+        (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      // force output parallelism = 1
+      val output =
+        spark
+          .sql("CALL sys.create_global_index(table => 'test.T', index_column 
=> 'name', index_type => 'btree'," +
+            " options => 
'btree-index.records-per-range=1000,btree-index.build.max-parallelism=1')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      assertMultiplePartitionsResult("T", 100000L, 2)
+    }
+  }
+
+  private def assertMultiplePartitionsResult(
+      tableName: String,
+      rowCount: Long,
+      partCount: Int
+  ): Unit = {
+    val table = loadTable(tableName)
+    val btreeEntries = table
+      .store()
+      .newIndexFileHandler()
+      .scanEntries()
+      .asScala
+      .filter(_.indexFile().indexType() == "btree")
+    table.store().newGlobalIndexScanBuilder().shardList()

Review Comment:
   This follows current test case.



##########
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala:
##########
@@ -137,4 +140,219 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
       assert(totalRowCount == 189088L)
     }
   }
+
+  test("create btree global index") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true',
+                  |  'btree-index.records-per-range' = '1000')
+                  |""".stripMargin)
+
+      val values =
+        (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql(
+            "CALL sys.create_global_index(table => 'test.T', index_column => 
'name', index_type => 'btree'," +
+              " options => 'btree-index.records-per-range=1000')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+      val table = loadTable("T")
+      val btreeEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "btree")
+        .map(_.indexFile())
+      table.store().newGlobalIndexScanBuilder().shardList()

Review Comment:
   This follows current test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to