This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch ISSUE-6353
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit bea7b1c2682cf58a76777fb05a93f718abc27943
Author: FANNG <[email protected]>
AuthorDate: Thu Jan 16 16:59:47 2025 +0800

    [#6196] feat(iceberg): adjust table distribution if creating table without 
specifying disribution mode (#6214)
    
    ### What changes were proposed in this pull request?
    
    Adjust the distribution mode for creating Iceberg table with none
    distribution. the following is the Spark adjust logic, the flink is
    similar.
    
    ```java
      private DistributionMode defaultWriteDistributionMode() {
        if (table.sortOrder().isSorted()) {
          return RANGE;
        } else if (table.spec().isPartitioned()) {
          return HASH;
        } else {
          return NONE;
        }
      }
    ```
    
    ### Why are the changes needed?
    
    Fix: #6196
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, add document
    
    ### How was this patch tested?
    
    add UT and IT
---
 .../iceberg/IcebergCatalogOperations.java          | 18 +++++
 .../catalog/lakehouse/iceberg/IcebergTable.java    | 36 +++++-----
 .../integration/test/CatalogIcebergBaseIT.java     | 80 ++++++++++++++++++++--
 docs/lakehouse-iceberg-catalog.md                  | 75 ++------------------
 4 files changed, 117 insertions(+), 92 deletions(-)

diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
index 7b27438d2e..aef42044c0 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
@@ -58,6 +58,7 @@ import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
@@ -513,6 +514,13 @@ public class IcebergCatalogOperations implements 
CatalogOperations, SupportsSche
                           .build())
               .toArray(IcebergColumn[]::new);
 
+      // Gravitino NONE distribution means the client side doesn't specify 
distribution, which is
+      // not the same as none distribution in Iceberg.
+      if (Distributions.NONE.equals(distribution)) {
+        distribution =
+            getIcebergDefaultDistribution(sortOrders.length > 0, 
partitioning.length > 0);
+      }
+
       IcebergTable createdTable =
           IcebergTable.builder()
               .withName(tableIdent.name())
@@ -588,6 +596,16 @@ public class IcebergCatalogOperations implements 
CatalogOperations, SupportsSche
     }
   }
 
+  private static Distribution getIcebergDefaultDistribution(
+      boolean isSorted, boolean isPartitioned) {
+    if (isSorted) {
+      return Distributions.RANGE;
+    } else if (isPartitioned) {
+      return Distributions.HASH;
+    }
+    return Distributions.NONE;
+  }
+
   private static String currentUser() {
     return PrincipalUtils.getCurrentUserName();
   }
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java
 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java
index 27e3c429e7..3f2f54c1b3 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java
@@ -152,21 +152,6 @@ public class IcebergTable extends BaseTable {
     Schema schema = table.schema();
     Transform[] partitionSpec = 
FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema);
     SortOrder[] sortOrder = 
FromIcebergSortOrder.fromSortOrder(table.sortOrder());
-    Distribution distribution = Distributions.NONE;
-    String distributionName = 
properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
-    if (null != distributionName) {
-      switch (DistributionMode.fromName(distributionName)) {
-        case HASH:
-          distribution = Distributions.HASH;
-          break;
-        case RANGE:
-          distribution = Distributions.RANGE;
-          break;
-        default:
-          // do nothing
-          break;
-      }
-    }
     IcebergColumn[] icebergColumns =
         
schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new);
     return IcebergTable.builder()
@@ -178,7 +163,7 @@ public class IcebergTable extends BaseTable {
         .withAuditInfo(AuditInfo.EMPTY)
         .withPartitioning(partitionSpec)
         .withSortOrders(sortOrder)
-        .withDistribution(distribution)
+        .withDistribution(getDistribution(properties))
         .build();
   }
 
@@ -236,4 +221,23 @@ public class IcebergTable extends BaseTable {
   public static Builder builder() {
     return new Builder();
   }
+
+  private static Distribution getDistribution(Map<String, String> properties) {
+    Distribution distribution = Distributions.NONE;
+    String distributionName = 
properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
+    if (null != distributionName) {
+      switch (DistributionMode.fromName(distributionName)) {
+        case HASH:
+          distribution = Distributions.HASH;
+          break;
+        case RANGE:
+          distribution = Distributions.RANGE;
+          break;
+        default:
+          // do nothing
+          break;
+      }
+    }
+    return distribution;
+  }
 }
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
index fd37441b45..f0162a6ec8 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
@@ -379,6 +379,76 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
     Assertions.assertNull(loadTable.comment());
   }
 
+  @Test
+  void testCreateTableWithNoneDistribution() {
+    // Create table from Gravitino API
+    Column[] columns = createColumns();
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Distribution distribution = Distributions.NONE;
+
+    final SortOrder[] sortOrders =
+        new SortOrder[] {
+          SortOrders.of(
+              NamedReference.field(ICEBERG_COL_NAME2),
+              SortDirection.DESCENDING,
+              NullOrdering.NULLS_FIRST)
+        };
+
+    Transform[] partitioning = new Transform[] 
{Transforms.day(columns[1].name())};
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table tableWithPartitionAndSortorder =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            partitioning,
+            distribution,
+            sortOrders);
+    Assertions.assertEquals(tableName, tableWithPartitionAndSortorder.name());
+    Assertions.assertEquals(Distributions.RANGE, 
tableWithPartitionAndSortorder.distribution());
+
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadTable.name());
+    Assertions.assertEquals(Distributions.RANGE, loadTable.distribution());
+    tableCatalog.dropTable(tableIdentifier);
+
+    Table tableWithPartition =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            partitioning,
+            distribution,
+            new SortOrder[0]);
+    Assertions.assertEquals(tableName, tableWithPartition.name());
+    Assertions.assertEquals(Distributions.HASH, 
tableWithPartition.distribution());
+
+    loadTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadTable.name());
+    Assertions.assertEquals(Distributions.HASH, loadTable.distribution());
+    tableCatalog.dropTable(tableIdentifier);
+
+    Table tableWithoutPartitionAndSortOrder =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            new Transform[0],
+            distribution,
+            new SortOrder[0]);
+    Assertions.assertEquals(tableName, 
tableWithoutPartitionAndSortOrder.name());
+    Assertions.assertEquals(Distributions.NONE, 
tableWithoutPartitionAndSortOrder.distribution());
+
+    loadTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadTable.name());
+    Assertions.assertEquals(Distributions.NONE, loadTable.distribution());
+  }
+
   @Test
   void testCreateAndLoadIcebergTable() {
     // Create table from Gravitino API
@@ -968,9 +1038,9 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
         columns,
         table_comment,
         properties,
-        partitioning,
+        new Transform[0],
         distribution,
-        sortOrders);
+        new SortOrder[0]);
 
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
 
@@ -981,8 +1051,8 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
         Arrays.asList(columns),
         properties,
         distribution,
-        sortOrders,
-        partitioning,
+        new SortOrder[0],
+        new Transform[0],
         loadTable);
 
     Assertions.assertDoesNotThrow(() -> 
tableCatalog.dropTable(tableIdentifier));
@@ -1179,7 +1249,7 @@ public abstract class CatalogIcebergBaseIT extends BaseIT 
{
     Column[] columns = createColumns();
 
     NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
-    Distribution distribution = Distributions.NONE;
+    Distribution distribution = Distributions.HASH;
 
     final SortOrder[] sortOrders =
         new SortOrder[] {
diff --git a/docs/lakehouse-iceberg-catalog.md 
b/docs/lakehouse-iceberg-catalog.md
index 6ad011d716..f8b462b24e 100644
--- a/docs/lakehouse-iceberg-catalog.md
+++ b/docs/lakehouse-iceberg-catalog.md
@@ -220,79 +220,12 @@ For `bucket` and `truncate`, the first argument must be 
integer literal, and the
 
 ### Table distributions
 
-- Gravitino used by default `NoneDistribution`.
+- Support `HashDistribution`, which distribute data by partition key.
+- Support `RangeDistribution`, which distribute data by partition key or sort 
key for a SortOrder table.
+- Doesn't support `EvenDistribution`.
 
-<Tabs groupId='language' queryString>
-<TabItem value="json" label="JSON">
-
-```json
-{
-  "strategy": "none",
-  "number": 0,
-  "expressions": []
-}
-```
-
-</TabItem>
-<TabItem value="java" label="Java">
-
-```java
-Distributions.NONE;
-```
-
-</TabItem>
-</Tabs>
-
-- Support `HashDistribution`, Hash distribute by partition key.
-
-<Tabs groupId='language' queryString>
-<TabItem value="json" label="JSON">
-
-```json
-{
-  "strategy": "hash",
-  "number": 0,
-  "expressions": []
-}
-```
-</TabItem>
-<TabItem value="java" label="Java">
-
-```java
-Distributions.HASH;
-```
-
-</TabItem>
-</Tabs>
-
-- Support `RangeDistribution`, You can pass `range` as values through the API. 
Range distribute by partition key or sort key if table has an SortOrder.
-
-<Tabs groupId='language' queryString>
-<TabItem value="json" label="JSON">
-
-```json
-{
-  "strategy": "range",
-  "number": 0,
-  "expressions": []
-}
-```
-
-</TabItem>
-<TabItem value="java" label="Java">
-
-```java
-Distributions.RANGE;
-```
-
-</TabItem>
-</Tabs>
-
-:::info
-Iceberg automatically distributes the data according to the partition or table 
sort order. It is forbidden to specify distribution expressions.
-:::
 :::info
-Apache Iceberg doesn't support Gravitino `EvenDistribution` type.
+If you doesn't specify distribution expressions, the table distribution will 
be adjusted to `RangeDistribution` for a sort order table, to 
`HashDistribution` for a partition table.
 :::
 
 ### Table column types

Reply via email to