yuqi1129 commented on code in PR #9858:
URL: https://github.com/apache/gravitino/pull/9858#discussion_r2781602472
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
Review Comment:
I would like to know if `ON CLUSTER` is absent, is that a distribute table
also local table?
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
+ return engine;
+ }
+
sqlBuilder.append("\n ENGINE = %s".formatted(engine.getValue()));
return engine;
}
+ private void appendPartitionClause(
+ Transform[] partitioning,
+ StringBuilder sqlBuilder,
+ ClickHouseTablePropertiesMetadata.ENGINE engine) {
+ if (ArrayUtils.isEmpty(partitioning)) {
+ return;
+ }
+
+ if (!engine.acceptPartition()) {
+ throw new UnsupportedOperationException(
+ "Partitioning is only supported for MergeTree family engines");
+ }
+
+ List<String> partitionExprs =
+
Arrays.stream(partitioning).map(this::toPartitionExpression).collect(Collectors.toList());
+ String partitionExpr =
+ partitionExprs.size() == 1
+ ? partitionExprs.get(0)
+ : "tuple(" + String.join(", ", partitionExprs) + ")";
+ sqlBuilder.append("\n PARTITION BY ").append(partitionExpr);
+ }
+
+ private String toPartitionExpression(Transform transform) {
+ Preconditions.checkArgument(transform != null, "Partition transform cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.equalsIgnoreCase(transform.name(),
Transforms.NAME_OF_IDENTITY),
+ "Unsupported partition transform: " + transform.name());
+ Preconditions.checkArgument(
+ transform.arguments().length == 1
+ && transform.arguments()[0] instanceof NamedReference
+ && ((NamedReference) transform.arguments()[0]).fieldName().length
== 1,
+ "ClickHouse only supports single column identity partitioning");
Review Comment:
Yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]