jackye1995 commented on a change in pull request #3275:
URL: https://github.com/apache/iceberg/pull/3275#discussion_r742469306



##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  private boolean insertProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int insertedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+        for (int i = 0; i < properties.size(); i++) {
+          if (i != 0) {
+            sqlStatement.append(", ");
+          }
+          sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+        }
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;
+          for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+            sql.setString(rowIndex + 1, catalogName);
+            sql.setString(rowIndex + 2, namespaceName);
+            sql.setString(rowIndex + 3, keyValue.getKey());
+            sql.setString(rowIndex + 4, keyValue.getValue());
+            rowIndex += 4;
+          }
+          return sql.executeUpdate();
+        }
+      });
+
+      if (insertedRecords == properties.size()) {
+        LOG.debug("Successfully committed to new namespace: {}", 
namespaceName);
+        return true;
+      } else {
+        throw new CommitFailedException("Failed to insertProperties to 
namespace %s in catalog %s", namespaceName,
+                catalogName);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e,
+              "Interrupted in call to insertProperties(namespace, properties) 
Namespace: %s", namespace);
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to insertProperties to 
namespace: %s in catalog: %s", namespace,
+              catalogName);
+    }
+  }
+
+  private boolean updateProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int updatedRecords = connections.run(conn -> {
+        StringBuilder valueCases = new StringBuilder();

Review comment:
       I think we can have a `JdbcUtil.updatePropertiesStatement(properties)` 
to formulate the SQL string, instead of defining pieces of the string there and 
formulate the SQL here. This will allow you to only use a single string builder 
to complete the full statement sentence.

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  private boolean insertProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int insertedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+        for (int i = 0; i < properties.size(); i++) {
+          if (i != 0) {
+            sqlStatement.append(", ");
+          }
+          sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+        }
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;
+          for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+            sql.setString(rowIndex + 1, catalogName);
+            sql.setString(rowIndex + 2, namespaceName);
+            sql.setString(rowIndex + 3, keyValue.getKey());
+            sql.setString(rowIndex + 4, keyValue.getValue());
+            rowIndex += 4;
+          }
+          return sql.executeUpdate();
+        }
+      });
+
+      if (insertedRecords == properties.size()) {
+        LOG.debug("Successfully committed to new namespace: {}", 
namespaceName);
+        return true;
+      } else {
+        throw new CommitFailedException("Failed to insertProperties to 
namespace %s in catalog %s", namespaceName,
+                catalogName);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e,
+              "Interrupted in call to insertProperties(namespace, properties) 
Namespace: %s", namespace);
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to insertProperties to 
namespace: %s in catalog: %s", namespace,
+              catalogName);
+    }
+  }
+
+  private boolean updateProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int updatedRecords = connections.run(conn -> {
+        StringBuilder valueCases = new StringBuilder();
+        for (int i = 0; i < properties.size(); i++) {
+          valueCases.append(" \n " + JdbcUtil.UPDATE_PROPERTIES_VALUES_BASE);
+        }
+
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.UPDATE_NAMESPACE_PROPERTIES_SQL.replace("{}",
+            valueCases.toString()));
+
+        String values = String.join(",", 
Collections.nCopies(properties.size(), String.valueOf('?')));
+        sqlStatement.append("(").append(values).append(")");
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;

Review comment:
       typically this incremental of row index in SQL formation can be done 
just by using `rowIndex++`, instead of +1, +2 and then increment by 2.

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -340,14 +460,123 @@ public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyExcept
   @Override
   public boolean setProperties(Namespace namespace, Map<String, String> 
properties) throws
       NoSuchNamespaceException {
-    throw new UnsupportedOperationException(
-        "Cannot set properties " + namespace + " : setProperties is not 
supported");
+    if (!namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+    }
+
+    if (properties == null || properties.isEmpty()) {
+      throw new IllegalArgumentException("Cannot setProperties to a namespace 
with null or empty properties");
+    }
+
+    Map<String, String> namespaceProperties = getProperties(namespace);
+    Map<String, String> propertiesToInsert = new HashMap<>();
+    Map<String, String> propertiesToUpdate = new HashMap<>();
+
+    for (String key : properties.keySet()) {
+      String value = properties.get(key);
+      if (namespaceProperties.containsKey(key)) {
+        if (!namespaceProperties.get(key).equals(value)) {
+          propertiesToUpdate.put(key, value);
+        }
+      } else {
+        propertiesToInsert.put(key, value);
+      }
+    }
+
+    boolean insertedProperties = true;
+    if (!propertiesToInsert.isEmpty()) {
+      insertedProperties = insertProperties(namespace, propertiesToInsert);
+    }
+
+    boolean updatedProperties = true;
+    if (!propertiesToUpdate.isEmpty()) {
+      updatedProperties = updateProperties(namespace, propertiesToUpdate);
+    }
+
+    return (insertedProperties && updatedProperties);
   }
 
   @Override
   public boolean removeProperties(Namespace namespace, Set<String> properties) 
throws NoSuchNamespaceException {
-    throw new UnsupportedOperationException(
-        "Cannot remove properties " + namespace + " : removeProperties is not 
supported");
+    if (!namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+    }
+
+    if (properties == null || properties.isEmpty()) {
+      throw new IllegalArgumentException("Cannot removeProperties with null or 
empty properties");
+    }
+
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int deletedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.DELETE_NAMESPACE_PROPERTIES_SQL);
+        String values = String.join(",", 
Collections.nCopies(properties.size(), String.valueOf('?')));
+        sqlStatement.append("(").append(values).append(")");
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          sql.setString(1, catalogName);
+          sql.setString(2, namespaceName);
+          int valueIndex = 2;
+          for (String property : properties) {
+            sql.setString(valueIndex + 1, property);

Review comment:
       can use ++

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  private boolean insertProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int insertedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+        for (int i = 0; i < properties.size(); i++) {
+          if (i != 0) {
+            sqlStatement.append(", ");
+          }
+          sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+        }
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;
+          for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+            sql.setString(rowIndex + 1, catalogName);
+            sql.setString(rowIndex + 2, namespaceName);
+            sql.setString(rowIndex + 3, keyValue.getKey());
+            sql.setString(rowIndex + 4, keyValue.getValue());
+            rowIndex += 4;
+          }
+          return sql.executeUpdate();
+        }
+      });
+
+      if (insertedRecords == properties.size()) {
+        LOG.debug("Successfully committed to new namespace: {}", 
namespaceName);
+        return true;
+      } else {
+        throw new CommitFailedException("Failed to insertProperties to 
namespace %s in catalog %s", namespaceName,
+                catalogName);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e,
+              "Interrupted in call to insertProperties(namespace, properties) 
Namespace: %s", namespace);

Review comment:
       please also check your other error messages if they have the same issue, 
thanks!

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -29,6 +29,8 @@
 import java.sql.SQLTimeoutException;
 import java.sql.SQLTransientConnectionException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;

Review comment:
       prefer to use Guava `Maps.newHashMap()`

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  private boolean insertProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int insertedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+        for (int i = 0; i < properties.size(); i++) {
+          if (i != 0) {
+            sqlStatement.append(", ");
+          }
+          sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+        }
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;
+          for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+            sql.setString(rowIndex + 1, catalogName);
+            sql.setString(rowIndex + 2, namespaceName);
+            sql.setString(rowIndex + 3, keyValue.getKey());
+            sql.setString(rowIndex + 4, keyValue.getValue());
+            rowIndex += 4;
+          }
+          return sql.executeUpdate();
+        }
+      });
+
+      if (insertedRecords == properties.size()) {
+        LOG.debug("Successfully committed to new namespace: {}", 
namespaceName);
+        return true;
+      } else {
+        throw new CommitFailedException("Failed to insertProperties to 
namespace %s in catalog %s", namespaceName,

Review comment:
       nit: prefer to not use code but human friendly language in error 
message, like "Failed to insert properties to namespace %s in catalog %s"

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  private boolean insertProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int insertedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+        for (int i = 0; i < properties.size(); i++) {
+          if (i != 0) {
+            sqlStatement.append(", ");
+          }
+          sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+        }
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;
+          for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+            sql.setString(rowIndex + 1, catalogName);
+            sql.setString(rowIndex + 2, namespaceName);
+            sql.setString(rowIndex + 3, keyValue.getKey());
+            sql.setString(rowIndex + 4, keyValue.getValue());
+            rowIndex += 4;
+          }
+          return sql.executeUpdate();
+        }
+      });
+
+      if (insertedRecords == properties.size()) {
+        LOG.debug("Successfully committed to new namespace: {}", 
namespaceName);
+        return true;
+      } else {
+        throw new CommitFailedException("Failed to insertProperties to 
namespace %s in catalog %s", namespaceName,
+                catalogName);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e,
+              "Interrupted in call to insertProperties(namespace, properties) 
Namespace: %s", namespace);

Review comment:
       nit: prefer to not use code but human friendly language in error 
message, like "Interrupted when inserting properties to namespace: %s"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to