jackye1995 commented on a change in pull request #3275:
URL: https://github.com/apache/iceberg/pull/3275#discussion_r742469306
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size(); i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to insertProperties to
namespace: %s in catalog: %s", namespace,
+ catalogName);
+ }
+ }
+
+ private boolean updateProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int updatedRecords = connections.run(conn -> {
+ StringBuilder valueCases = new StringBuilder();
Review comment:
I think we can have a `JdbcUtil.updatePropertiesStatement(properties)`
to formulate the SQL string, instead of defining pieces of the string there and
formulate the SQL here. This will allow you to only use a single string builder
to complete the full statement sentence.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size(); i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to insertProperties to
namespace: %s in catalog: %s", namespace,
+ catalogName);
+ }
+ }
+
+ private boolean updateProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int updatedRecords = connections.run(conn -> {
+ StringBuilder valueCases = new StringBuilder();
+ for (int i = 0; i < properties.size(); i++) {
+ valueCases.append(" \n " + JdbcUtil.UPDATE_PROPERTIES_VALUES_BASE);
+ }
+
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.UPDATE_NAMESPACE_PROPERTIES_SQL.replace("{}",
+ valueCases.toString()));
+
+ String values = String.join(",",
Collections.nCopies(properties.size(), String.valueOf('?')));
+ sqlStatement.append("(").append(values).append(")");
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
Review comment:
typically this incremental of row index in SQL formation can be done
just by using `rowIndex++`, instead of +1, +2 and then increment by 2.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -340,14 +460,123 @@ public boolean dropNamespace(Namespace namespace) throws
NamespaceNotEmptyExcept
@Override
public boolean setProperties(Namespace namespace, Map<String, String>
properties) throws
NoSuchNamespaceException {
- throw new UnsupportedOperationException(
- "Cannot set properties " + namespace + " : setProperties is not
supported");
+ if (!namespaceExists(namespace)) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ }
+
+ if (properties == null || properties.isEmpty()) {
+ throw new IllegalArgumentException("Cannot setProperties to a namespace
with null or empty properties");
+ }
+
+ Map<String, String> namespaceProperties = getProperties(namespace);
+ Map<String, String> propertiesToInsert = new HashMap<>();
+ Map<String, String> propertiesToUpdate = new HashMap<>();
+
+ for (String key : properties.keySet()) {
+ String value = properties.get(key);
+ if (namespaceProperties.containsKey(key)) {
+ if (!namespaceProperties.get(key).equals(value)) {
+ propertiesToUpdate.put(key, value);
+ }
+ } else {
+ propertiesToInsert.put(key, value);
+ }
+ }
+
+ boolean insertedProperties = true;
+ if (!propertiesToInsert.isEmpty()) {
+ insertedProperties = insertProperties(namespace, propertiesToInsert);
+ }
+
+ boolean updatedProperties = true;
+ if (!propertiesToUpdate.isEmpty()) {
+ updatedProperties = updateProperties(namespace, propertiesToUpdate);
+ }
+
+ return (insertedProperties && updatedProperties);
}
@Override
public boolean removeProperties(Namespace namespace, Set<String> properties)
throws NoSuchNamespaceException {
- throw new UnsupportedOperationException(
- "Cannot remove properties " + namespace + " : removeProperties is not
supported");
+ if (!namespaceExists(namespace)) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ }
+
+ if (properties == null || properties.isEmpty()) {
+ throw new IllegalArgumentException("Cannot removeProperties with null or
empty properties");
+ }
+
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int deletedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.DELETE_NAMESPACE_PROPERTIES_SQL);
+ String values = String.join(",",
Collections.nCopies(properties.size(), String.valueOf('?')));
+ sqlStatement.append("(").append(values).append(")");
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ sql.setString(1, catalogName);
+ sql.setString(2, namespaceName);
+ int valueIndex = 2;
+ for (String property : properties) {
+ sql.setString(valueIndex + 1, property);
Review comment:
can use ++
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size(); i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
Review comment:
please also check your other error messages if they have the same issue,
thanks!
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -29,6 +29,8 @@
import java.sql.SQLTimeoutException;
import java.sql.SQLTransientConnectionException;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
Review comment:
prefer to use Guava `Maps.newHashMap()`
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size(); i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
Review comment:
nit: prefer to not use code but human friendly language in error
message, like "Failed to insert properties to namespace %s in catalog %s"
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size(); i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
Review comment:
nit: prefer to not use code but human friendly language in error
message, like "Interrupted when inserting properties to namespace: %s"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]