yuqi1129 commented on code in PR #9858:
URL: https://github.com/apache/gravitino/pull/9858#discussion_r2781235790
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
Review Comment:
I prefer to keep the property in the map.
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
Review Comment:
> Why don’t we use the expression with shardingKey?
What do you mean here? Converting it to a Gravitino `Expression`?
##########
catalogs-contrib/catalog-jdbc-clickhouse/src/main/java/org/apache/gravitino/catalog/clickhouse/operations/ClickHouseTableOperations.java:
##########
@@ -228,18 +273,93 @@ private static void appendOrderBy(
}
private ClickHouseTablePropertiesMetadata.ENGINE appendTableEngine(
- Map<String, String> properties, StringBuilder sqlBuilder) {
+ Map<String, String> properties, StringBuilder sqlBuilder, boolean
onCluster) {
ClickHouseTablePropertiesMetadata.ENGINE engine =
ENGINE_PROPERTY_ENTRY.getDefaultValue();
if (MapUtils.isNotEmpty(properties)) {
- String userSetEngine = properties.remove(CLICKHOUSE_ENGINE_KEY);
+ String userSetEngine = properties.get(CLICKHOUSE_ENGINE_KEY);
if (StringUtils.isNotEmpty(userSetEngine)) {
engine =
ClickHouseTablePropertiesMetadata.ENGINE.fromString(userSetEngine);
}
}
+
+ if (engine == ENGINE.DISTRIBUTED) {
+ if (!onCluster) {
+ throw new IllegalArgumentException(
+ "ENGINE = DISTRIBUTED requires ON CLUSTER clause to be
specified.");
+ }
+
+ // Check properties
+ String clusterName = properties.get(ClusterConstants.CLUSTER_NAME);
+ String remoteDatabase =
properties.get(DistributedTableConstants.REMOTE_DATABASE);
+ String remoteTable =
properties.get(DistributedTableConstants.REMOTE_TABLE);
+ String shardingKey =
properties.get(DistributedTableConstants.SHARDING_KEY);
+
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clusterName),
+ "Cluster name must be specified when engine is Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteDatabase),
+ "Remote database must be specified for Distributed");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(remoteTable), "Remote table must be specified
for Distributed");
+
+ // User must ensure the sharding key is a trusted value
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(shardingKey), "Sharding key must be specified
for Distributed");
+
+ sqlBuilder.append(
+ "\n ENGINE = %s(`%s`,`%s`,`%s`,%s)"
+ .formatted(
+ ENGINE.DISTRIBUTED.getValue(),
+ clusterName,
+ remoteDatabase,
+ remoteTable,
+ shardingKey));
Review Comment:
> shardingKey may be a function , so it should not be escaped
Yeah, I'm handling it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]