kbendick commented on a change in pull request #3275:
URL: https://github.com/apache/iceberg/pull/3275#discussion_r741594748
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
+ sqlStatement.append(", " + JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
Review comment:
Nit: Since you're already using the string builder, don't concatenate
the strings. Just call `.append` twice. Once for the comma, and once for the
values placeholders.
##########
File path: core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
##########
@@ -555,6 +556,66 @@ public void testDropNamespace() {
"is not empty. 1 tables exist.", () ->
catalog.dropNamespace(tbl4.namespace()));
}
+ @Test
+ public void testCreateNamespace() {
+ Namespace testNamespace = Namespace.of("testDb", "ns1", "ns2");
+ // Test with null metadata
+ AssertHelpers.assertThrows("Cannot create a namespace with null or empty
metadata", IllegalArgumentException.class,
+ () -> catalog.createNamespace(testNamespace, null));
+ Assert.assertFalse(catalog.namespaceExists(testNamespace));
Review comment:
Question for my own understanding:
In spark (let's say), when we call `CREATE DATABASE default`, what metadata
gets added?
I see in the tests these two spark statements. The first seems like it could
potentially have a null or empty metadata.
```
sql("CREATE NAMESPACE %s", fullNamespace);
sql("CREATE NAMESPACE %s WITH PROPERTIES ('prop'='value')", fullNamespace);
```
Are we sure that metadata can't be empty, in any engine?
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -116,6 +119,19 @@ private void initializeCatalogTables() throws
InterruptedException, SQLException
LOG.debug("Creating table {} to store iceberg catalog",
JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
});
+
+ connections.run(conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists = dbMeta.getTables(null, null,
JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE, null);
+
+ if (tableExists.next()) {
+ return true;
Review comment:
Question: Please, correct me if I'm wrong, but is this call to
`getTables` using the actual CREATE TABLE DDL? Is there a reason that the full
create table DDL statement needs to be passed in here? Would just using the
resulting table name `JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME` work here?
Seems like a call to `getTables` would use the table name and not the full
create table DDL with the schema etc in it.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
+ sqlStatement.append(", " + JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
Review comment:
Question: Is this an expected occurrence or just being cautious?
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -116,6 +119,19 @@ private void initializeCatalogTables() throws
InterruptedException, SQLException
LOG.debug("Creating table {} to store iceberg catalog",
JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
});
+
+ connections.run(conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists = dbMeta.getTables(null, null,
JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE, null);
+
+ if (tableExists.next()) {
+ return true;
Review comment:
Nit: It's strange to see a `ResultSet` or any kind of iterator with a
boolean name. Due to my next comment, I'm not 100% sure about a better
alternative, but maybe `namespacePropertiesTableListing` or something might be
appropriate?
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
Review comment:
Nit: This took me quite some time to read and realize what was taking
place here. Possibly it could be cleaned up a bit to help readers?
The `INSERT_NAMESPACE_PROPERTIES_SQL = `INSERT INTO
iceberg_namespace_properties(catalog_name , namespace, key, value) VALUES
(?,?,?,?)`.
And then the code iterates over `properties.size() - 1`, appending `,
(?,?,?,?)`.
So a few concerns:
1) What happens if this function is called when `properties` is empty? Is
that possible?
2) For readability purposes, would it make sense to possibly append first
the INSERT INTO statement, then use a google comon's library JOINER of `,` or a
repeater to join properties.size() count of placeholders?
I had to spend quite some time verifying the correctness of this and I'm
still not entirely sure (though I assume it's not going to be called with an
empty `properties` value.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
+ sqlStatement.append(", " + JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to insertProperties to
namespace: %s in catalog: %s", namespace,
+ catalogName);
+ }
+ }
+
+ private boolean updateProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int updatedRecords = connections.run(conn -> {
+ StringBuilder valueCases = new StringBuilder();
+ for (int i = 0; i < properties.size(); i++) {
+ valueCases.append(" \n " + JdbcUtil.UPDATE_PROPERTIES_VALUES_BASE);
+ }
+
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.UPDATE_NAMESPACE_PROPERTIES_SQL.replace("{}",
+ valueCases.toString()));
+
+ String values = String.join(",",
Collections.nCopies(properties.size(), String.valueOf('?')));
+ sqlStatement.append("(").append(values).append(")");
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, keyValue.getKey());
+ sql.setString(rowIndex + 2, keyValue.getValue());
+ rowIndex += 2;
+ }
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+
+ rowIndex += 2;
+ for (String key : properties.keySet()) {
+ sql.setString(rowIndex + 1, key);
+ rowIndex += 1;
+ }
+
+ return sql.executeUpdate();
+ }
+ });
+
+ if (updatedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to updateProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to updateProperties(namespace, properties)
Namespace: %s", namespace);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to updateProperties to
namespace: %s in catalog: %s", namespace,
+ catalogName);
+ }
+ }
+
@Override
public void createNamespace(Namespace namespace, Map<String, String>
metadata) {
- throw new UnsupportedOperationException("Cannot create namespace " +
namespace +
- ": createNamespace is not supported");
+ if (namespaceExists(namespace)) {
+ throw new AlreadyExistsException("Namespace already exists: %s",
namespace);
+ }
+
+ if (metadata == null || metadata.isEmpty()) {
+ throw new IllegalArgumentException("Cannot create a namespace with null
or empty metadata");
+ }
Review comment:
Question: Are we sure that `metadata` can't be empty?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]