nssalian commented on a change in pull request #3275:
URL: https://github.com/apache/iceberg/pull/3275#discussion_r742472593
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size(); i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
Review comment:
Agreed but the other methods have the same style as well
```
Interrupted in call to namespaceExists(namespace)
Interrupted in call to listNamespaces(namespace) Namespace
```
Do we want to change them all to be consistent? I followed what was in the
file already. Let me know.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -340,14 +460,123 @@ public boolean dropNamespace(Namespace namespace) throws
NamespaceNotEmptyExcept
@Override
public boolean setProperties(Namespace namespace, Map<String, String>
properties) throws
NoSuchNamespaceException {
- throw new UnsupportedOperationException(
- "Cannot set properties " + namespace + " : setProperties is not
supported");
+ if (!namespaceExists(namespace)) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ }
+
+ if (properties == null || properties.isEmpty()) {
+ throw new IllegalArgumentException("Cannot setProperties to a namespace
with null or empty properties");
+ }
+
+ Map<String, String> namespaceProperties = getProperties(namespace);
+ Map<String, String> propertiesToInsert = new HashMap<>();
+ Map<String, String> propertiesToUpdate = new HashMap<>();
+
+ for (String key : properties.keySet()) {
+ String value = properties.get(key);
+ if (namespaceProperties.containsKey(key)) {
+ if (!namespaceProperties.get(key).equals(value)) {
+ propertiesToUpdate.put(key, value);
+ }
+ } else {
+ propertiesToInsert.put(key, value);
+ }
+ }
+
+ boolean insertedProperties = true;
+ if (!propertiesToInsert.isEmpty()) {
+ insertedProperties = insertProperties(namespace, propertiesToInsert);
+ }
+
+ boolean updatedProperties = true;
+ if (!propertiesToUpdate.isEmpty()) {
+ updatedProperties = updateProperties(namespace, propertiesToUpdate);
+ }
+
+ return (insertedProperties && updatedProperties);
}
@Override
public boolean removeProperties(Namespace namespace, Set<String> properties)
throws NoSuchNamespaceException {
- throw new UnsupportedOperationException(
- "Cannot remove properties " + namespace + " : removeProperties is not
supported");
+ if (!namespaceExists(namespace)) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ }
+
+ if (properties == null || properties.isEmpty()) {
+ throw new IllegalArgumentException("Cannot removeProperties with null or
empty properties");
+ }
+
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int deletedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.DELETE_NAMESPACE_PROPERTIES_SQL);
+ String values = String.join(",",
Collections.nCopies(properties.size(), String.valueOf('?')));
+ sqlStatement.append("(").append(values).append(")");
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ sql.setString(1, catalogName);
+ sql.setString(2, namespaceName);
+ int valueIndex = 2;
+ for (String property : properties) {
+ sql.setString(valueIndex + 1, property);
Review comment:
+1
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size(); i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to insertProperties to
namespace: %s in catalog: %s", namespace,
+ catalogName);
+ }
+ }
+
+ private boolean updateProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int updatedRecords = connections.run(conn -> {
+ StringBuilder valueCases = new StringBuilder();
Review comment:
Agreed, I can make the same for Insert as well.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,114 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size(); i++) {
+ if (i != 0) {
+ sqlStatement.append(", ");
+ }
+ sqlStatement.append(JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to insertProperties to
namespace: %s in catalog: %s", namespace,
+ catalogName);
+ }
+ }
+
+ private boolean updateProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int updatedRecords = connections.run(conn -> {
+ StringBuilder valueCases = new StringBuilder();
+ for (int i = 0; i < properties.size(); i++) {
+ valueCases.append(" \n " + JdbcUtil.UPDATE_PROPERTIES_VALUES_BASE);
+ }
+
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.UPDATE_NAMESPACE_PROPERTIES_SQL.replace("{}",
+ valueCases.toString()));
+
+ String values = String.join(",",
Collections.nCopies(properties.size(), String.valueOf('?')));
+ sqlStatement.append("(").append(values).append(")");
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
Review comment:
I think we'll need ++rowIndex since we need the incremental value.
Tested this and it worked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]