nssalian commented on a change in pull request #3275:
URL: https://github.com/apache/iceberg/pull/3275#discussion_r742472593



##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  private boolean insertProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int insertedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+        for (int i = 0; i < properties.size(); i++) {
+          if (i != 0) {
+            sqlStatement.append(", ");
+          }
+          sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+        }
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;
+          for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+            sql.setString(rowIndex + 1, catalogName);
+            sql.setString(rowIndex + 2, namespaceName);
+            sql.setString(rowIndex + 3, keyValue.getKey());
+            sql.setString(rowIndex + 4, keyValue.getValue());
+            rowIndex += 4;
+          }
+          return sql.executeUpdate();
+        }
+      });
+
+      if (insertedRecords == properties.size()) {
+        LOG.debug("Successfully committed to new namespace: {}", 
namespaceName);
+        return true;
+      } else {
+        throw new CommitFailedException("Failed to insertProperties to 
namespace %s in catalog %s", namespaceName,
+                catalogName);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e,
+              "Interrupted in call to insertProperties(namespace, properties) 
Namespace: %s", namespace);

Review comment:
       Agreed but the other methods have the same style as well 
   ```
   Interrupted in call to namespaceExists(namespace)
   Interrupted in call to listNamespaces(namespace) Namespace
   ```
   Do we want to change them all to be consistent? I followed what was in the 
file already. Let me know.

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -340,14 +460,123 @@ public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyExcept
   @Override
   public boolean setProperties(Namespace namespace, Map<String, String> 
properties) throws
       NoSuchNamespaceException {
-    throw new UnsupportedOperationException(
-        "Cannot set properties " + namespace + " : setProperties is not 
supported");
+    if (!namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+    }
+
+    if (properties == null || properties.isEmpty()) {
+      throw new IllegalArgumentException("Cannot setProperties to a namespace 
with null or empty properties");
+    }
+
+    Map<String, String> namespaceProperties = getProperties(namespace);
+    Map<String, String> propertiesToInsert = new HashMap<>();
+    Map<String, String> propertiesToUpdate = new HashMap<>();
+
+    for (String key : properties.keySet()) {
+      String value = properties.get(key);
+      if (namespaceProperties.containsKey(key)) {
+        if (!namespaceProperties.get(key).equals(value)) {
+          propertiesToUpdate.put(key, value);
+        }
+      } else {
+        propertiesToInsert.put(key, value);
+      }
+    }
+
+    boolean insertedProperties = true;
+    if (!propertiesToInsert.isEmpty()) {
+      insertedProperties = insertProperties(namespace, propertiesToInsert);
+    }
+
+    boolean updatedProperties = true;
+    if (!propertiesToUpdate.isEmpty()) {
+      updatedProperties = updateProperties(namespace, propertiesToUpdate);
+    }
+
+    return (insertedProperties && updatedProperties);
   }
 
   @Override
   public boolean removeProperties(Namespace namespace, Set<String> properties) 
throws NoSuchNamespaceException {
-    throw new UnsupportedOperationException(
-        "Cannot remove properties " + namespace + " : removeProperties is not 
supported");
+    if (!namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+    }
+
+    if (properties == null || properties.isEmpty()) {
+      throw new IllegalArgumentException("Cannot removeProperties with null or 
empty properties");
+    }
+
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int deletedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.DELETE_NAMESPACE_PROPERTIES_SQL);
+        String values = String.join(",", 
Collections.nCopies(properties.size(), String.valueOf('?')));
+        sqlStatement.append("(").append(values).append(")");
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          sql.setString(1, catalogName);
+          sql.setString(2, namespaceName);
+          int valueIndex = 2;
+          for (String property : properties) {
+            sql.setString(valueIndex + 1, property);

Review comment:
       +1

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  private boolean insertProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int insertedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+        for (int i = 0; i < properties.size(); i++) {
+          if (i != 0) {
+            sqlStatement.append(", ");
+          }
+          sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+        }
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;
+          for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+            sql.setString(rowIndex + 1, catalogName);
+            sql.setString(rowIndex + 2, namespaceName);
+            sql.setString(rowIndex + 3, keyValue.getKey());
+            sql.setString(rowIndex + 4, keyValue.getValue());
+            rowIndex += 4;
+          }
+          return sql.executeUpdate();
+        }
+      });
+
+      if (insertedRecords == properties.size()) {
+        LOG.debug("Successfully committed to new namespace: {}", 
namespaceName);
+        return true;
+      } else {
+        throw new CommitFailedException("Failed to insertProperties to 
namespace %s in catalog %s", namespaceName,
+                catalogName);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e,
+              "Interrupted in call to insertProperties(namespace, properties) 
Namespace: %s", namespace);
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to insertProperties to 
namespace: %s in catalog: %s", namespace,
+              catalogName);
+    }
+  }
+
+  private boolean updateProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int updatedRecords = connections.run(conn -> {
+        StringBuilder valueCases = new StringBuilder();

Review comment:
       Agreed, I can make the same for Insert as well.

##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  private boolean insertProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int insertedRecords = connections.run(conn -> {
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+        for (int i = 0; i < properties.size(); i++) {
+          if (i != 0) {
+            sqlStatement.append(", ");
+          }
+          sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+        }
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;
+          for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+            sql.setString(rowIndex + 1, catalogName);
+            sql.setString(rowIndex + 2, namespaceName);
+            sql.setString(rowIndex + 3, keyValue.getKey());
+            sql.setString(rowIndex + 4, keyValue.getValue());
+            rowIndex += 4;
+          }
+          return sql.executeUpdate();
+        }
+      });
+
+      if (insertedRecords == properties.size()) {
+        LOG.debug("Successfully committed to new namespace: {}", 
namespaceName);
+        return true;
+      } else {
+        throw new CommitFailedException("Failed to insertProperties to 
namespace %s in catalog %s", namespaceName,
+                catalogName);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new UncheckedInterruptedException(e,
+              "Interrupted in call to insertProperties(namespace, properties) 
Namespace: %s", namespace);
+    } catch (SQLException e) {
+      throw new UncheckedSQLException(e, "Failed to insertProperties to 
namespace: %s in catalog: %s", namespace,
+              catalogName);
+    }
+  }
+
+  private boolean updateProperties(Namespace namespace, Map<String, String> 
properties) {
+    String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+    try {
+      int updatedRecords = connections.run(conn -> {
+        StringBuilder valueCases = new StringBuilder();
+        for (int i = 0; i < properties.size(); i++) {
+          valueCases.append(" \n " + JdbcUtil.UPDATE_PROPERTIES_VALUES_BASE);
+        }
+
+        StringBuilder sqlStatement = new 
StringBuilder(JdbcUtil.UPDATE_NAMESPACE_PROPERTIES_SQL.replace("{}",
+            valueCases.toString()));
+
+        String values = String.join(",", 
Collections.nCopies(properties.size(), String.valueOf('?')));
+        sqlStatement.append("(").append(values).append(")");
+
+        try (PreparedStatement sql = 
conn.prepareStatement(sqlStatement.toString())) {
+          int rowIndex = 0;

Review comment:
       I think we'll need ++rowIndex since we need the incremental value. 
Tested this and it worked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to