This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 54d1c2a352 [#6196] feat(iceberg): adjust table distribution if
creating table without specifying disribution mode (#6299)
54d1c2a352 is described below
commit 54d1c2a3523002201a8be52da8a83b0a65c67897
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jan 16 18:01:42 2025 +0800
[#6196] feat(iceberg): adjust table distribution if creating table without
specifying disribution mode (#6299)
### What changes were proposed in this pull request?
Adjust the distribution mode for creating Iceberg table with none
distribution. the following is the Spark adjust logic, the flink is
similar.
```java
private DistributionMode defaultWriteDistributionMode() {
if (table.sortOrder().isSorted()) {
return RANGE;
} else if (table.spec().isPartitioned()) {
return HASH;
} else {
return NONE;
}
}
```
### Why are the changes needed?
Fix: #6196
### Does this PR introduce _any_ user-facing change?
Yes, add document
### How was this patch tested?
add UT and IT
Co-authored-by: FANNG <[email protected]>
---
.../iceberg/IcebergCatalogOperations.java | 18 +++++
.../catalog/lakehouse/iceberg/IcebergTable.java | 36 +++++-----
.../integration/test/CatalogIcebergBaseIT.java | 80 ++++++++++++++++++++--
docs/lakehouse-iceberg-catalog.md | 75 ++------------------
4 files changed, 117 insertions(+), 92 deletions(-)
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
index 7b27438d2e..aef42044c0 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java
@@ -58,6 +58,7 @@ import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
@@ -513,6 +514,13 @@ public class IcebergCatalogOperations implements
CatalogOperations, SupportsSche
.build())
.toArray(IcebergColumn[]::new);
+ // Gravitino NONE distribution means the client side doesn't specify
distribution, which is
+ // not the same as none distribution in Iceberg.
+ if (Distributions.NONE.equals(distribution)) {
+ distribution =
+ getIcebergDefaultDistribution(sortOrders.length > 0,
partitioning.length > 0);
+ }
+
IcebergTable createdTable =
IcebergTable.builder()
.withName(tableIdent.name())
@@ -588,6 +596,16 @@ public class IcebergCatalogOperations implements
CatalogOperations, SupportsSche
}
}
+ private static Distribution getIcebergDefaultDistribution(
+ boolean isSorted, boolean isPartitioned) {
+ if (isSorted) {
+ return Distributions.RANGE;
+ } else if (isPartitioned) {
+ return Distributions.HASH;
+ }
+ return Distributions.NONE;
+ }
+
private static String currentUser() {
return PrincipalUtils.getCurrentUserName();
}
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java
index 27e3c429e7..3f2f54c1b3 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java
@@ -152,21 +152,6 @@ public class IcebergTable extends BaseTable {
Schema schema = table.schema();
Transform[] partitionSpec =
FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema);
SortOrder[] sortOrder =
FromIcebergSortOrder.fromSortOrder(table.sortOrder());
- Distribution distribution = Distributions.NONE;
- String distributionName =
properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
- if (null != distributionName) {
- switch (DistributionMode.fromName(distributionName)) {
- case HASH:
- distribution = Distributions.HASH;
- break;
- case RANGE:
- distribution = Distributions.RANGE;
- break;
- default:
- // do nothing
- break;
- }
- }
IcebergColumn[] icebergColumns =
schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new);
return IcebergTable.builder()
@@ -178,7 +163,7 @@ public class IcebergTable extends BaseTable {
.withAuditInfo(AuditInfo.EMPTY)
.withPartitioning(partitionSpec)
.withSortOrders(sortOrder)
- .withDistribution(distribution)
+ .withDistribution(getDistribution(properties))
.build();
}
@@ -236,4 +221,23 @@ public class IcebergTable extends BaseTable {
public static Builder builder() {
return new Builder();
}
+
+ private static Distribution getDistribution(Map<String, String> properties) {
+ Distribution distribution = Distributions.NONE;
+ String distributionName =
properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
+ if (null != distributionName) {
+ switch (DistributionMode.fromName(distributionName)) {
+ case HASH:
+ distribution = Distributions.HASH;
+ break;
+ case RANGE:
+ distribution = Distributions.RANGE;
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+ return distribution;
+ }
}
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
index fd37441b45..f0162a6ec8 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
@@ -379,6 +379,76 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
Assertions.assertNull(loadTable.comment());
}
+ @Test
+ void testCreateTableWithNoneDistribution() {
+ // Create table from Gravitino API
+ Column[] columns = createColumns();
+
+ NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+ Distribution distribution = Distributions.NONE;
+
+ final SortOrder[] sortOrders =
+ new SortOrder[] {
+ SortOrders.of(
+ NamedReference.field(ICEBERG_COL_NAME2),
+ SortDirection.DESCENDING,
+ NullOrdering.NULLS_FIRST)
+ };
+
+ Transform[] partitioning = new Transform[]
{Transforms.day(columns[1].name())};
+ Map<String, String> properties = createProperties();
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ Table tableWithPartitionAndSortorder =
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ sortOrders);
+ Assertions.assertEquals(tableName, tableWithPartitionAndSortorder.name());
+ Assertions.assertEquals(Distributions.RANGE,
tableWithPartitionAndSortorder.distribution());
+
+ Table loadTable = tableCatalog.loadTable(tableIdentifier);
+ Assertions.assertEquals(tableName, loadTable.name());
+ Assertions.assertEquals(Distributions.RANGE, loadTable.distribution());
+ tableCatalog.dropTable(tableIdentifier);
+
+ Table tableWithPartition =
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ new SortOrder[0]);
+ Assertions.assertEquals(tableName, tableWithPartition.name());
+ Assertions.assertEquals(Distributions.HASH,
tableWithPartition.distribution());
+
+ loadTable = tableCatalog.loadTable(tableIdentifier);
+ Assertions.assertEquals(tableName, loadTable.name());
+ Assertions.assertEquals(Distributions.HASH, loadTable.distribution());
+ tableCatalog.dropTable(tableIdentifier);
+
+ Table tableWithoutPartitionAndSortOrder =
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ new Transform[0],
+ distribution,
+ new SortOrder[0]);
+ Assertions.assertEquals(tableName,
tableWithoutPartitionAndSortOrder.name());
+ Assertions.assertEquals(Distributions.NONE,
tableWithoutPartitionAndSortOrder.distribution());
+
+ loadTable = tableCatalog.loadTable(tableIdentifier);
+ Assertions.assertEquals(tableName, loadTable.name());
+ Assertions.assertEquals(Distributions.NONE, loadTable.distribution());
+ }
+
@Test
void testCreateAndLoadIcebergTable() {
// Create table from Gravitino API
@@ -968,9 +1038,9 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
columns,
table_comment,
properties,
- partitioning,
+ new Transform[0],
distribution,
- sortOrders);
+ new SortOrder[0]);
Table loadTable = tableCatalog.loadTable(tableIdentifier);
@@ -981,8 +1051,8 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
Arrays.asList(columns),
properties,
distribution,
- sortOrders,
- partitioning,
+ new SortOrder[0],
+ new Transform[0],
loadTable);
Assertions.assertDoesNotThrow(() ->
tableCatalog.dropTable(tableIdentifier));
@@ -1179,7 +1249,7 @@ public abstract class CatalogIcebergBaseIT extends BaseIT
{
Column[] columns = createColumns();
NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
- Distribution distribution = Distributions.NONE;
+ Distribution distribution = Distributions.HASH;
final SortOrder[] sortOrders =
new SortOrder[] {
diff --git a/docs/lakehouse-iceberg-catalog.md
b/docs/lakehouse-iceberg-catalog.md
index 6ad011d716..f8b462b24e 100644
--- a/docs/lakehouse-iceberg-catalog.md
+++ b/docs/lakehouse-iceberg-catalog.md
@@ -220,79 +220,12 @@ For `bucket` and `truncate`, the first argument must be
integer literal, and the
### Table distributions
-- Gravitino used by default `NoneDistribution`.
+- Support `HashDistribution`, which distribute data by partition key.
+- Support `RangeDistribution`, which distribute data by partition key or sort
key for a SortOrder table.
+- Doesn't support `EvenDistribution`.
-<Tabs groupId='language' queryString>
-<TabItem value="json" label="JSON">
-
-```json
-{
- "strategy": "none",
- "number": 0,
- "expressions": []
-}
-```
-
-</TabItem>
-<TabItem value="java" label="Java">
-
-```java
-Distributions.NONE;
-```
-
-</TabItem>
-</Tabs>
-
-- Support `HashDistribution`, Hash distribute by partition key.
-
-<Tabs groupId='language' queryString>
-<TabItem value="json" label="JSON">
-
-```json
-{
- "strategy": "hash",
- "number": 0,
- "expressions": []
-}
-```
-</TabItem>
-<TabItem value="java" label="Java">
-
-```java
-Distributions.HASH;
-```
-
-</TabItem>
-</Tabs>
-
-- Support `RangeDistribution`, You can pass `range` as values through the API.
Range distribute by partition key or sort key if table has an SortOrder.
-
-<Tabs groupId='language' queryString>
-<TabItem value="json" label="JSON">
-
-```json
-{
- "strategy": "range",
- "number": 0,
- "expressions": []
-}
-```
-
-</TabItem>
-<TabItem value="java" label="Java">
-
-```java
-Distributions.RANGE;
-```
-
-</TabItem>
-</Tabs>
-
-:::info
-Iceberg automatically distributes the data according to the partition or table
sort order. It is forbidden to specify distribution expressions.
-:::
:::info
-Apache Iceberg doesn't support Gravitino `EvenDistribution` type.
+If you doesn't specify distribution expressions, the table distribution will
be adjusted to `RangeDistribution` for a sort order table, to
`HashDistribution` for a partition table.
:::
### Table column types