kbendick commented on a change in pull request #3275:
URL: https://github.com/apache/iceberg/pull/3275#discussion_r741594748
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
+ sqlStatement.append(", " + JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
Review comment:
Nit: Since you're already using the string builder, don't concatenate
the strings. Just call `.append` twice. Once for the comma, and once for the
values placeholders.
##########
File path: core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
##########
@@ -555,6 +556,66 @@ public void testDropNamespace() {
"is not empty. 1 tables exist.", () ->
catalog.dropNamespace(tbl4.namespace()));
}
+ @Test
+ public void testCreateNamespace() {
+ Namespace testNamespace = Namespace.of("testDb", "ns1", "ns2");
+ // Test with null metadata
+ AssertHelpers.assertThrows("Cannot create a namespace with null or empty
metadata", IllegalArgumentException.class,
+ () -> catalog.createNamespace(testNamespace, null));
+ Assert.assertFalse(catalog.namespaceExists(testNamespace));
Review comment:
Question for my own understanding:
In spark (let's say), when we call `CREATE DATABASE default`, what metadata
gets added?
I see in the tests these two spark statements. The first seems like it could
potentially have a null or empty metadata.
```
sql("CREATE NAMESPACE %s", fullNamespace);
sql("CREATE NAMESPACE %s WITH PROPERTIES ('prop'='value')", fullNamespace);
```
Are we sure that metadata can't be empty, in any engine?
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -116,6 +119,19 @@ private void initializeCatalogTables() throws
InterruptedException, SQLException
LOG.debug("Creating table {} to store iceberg catalog",
JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
});
+
+ connections.run(conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists = dbMeta.getTables(null, null,
JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE, null);
+
+ if (tableExists.next()) {
+ return true;
Review comment:
Question: Please, correct me if I'm wrong, but is this call to
`getTables` using the actual CREATE TABLE DDL? Is there a reason that the full
create table DDL statement needs to be passed in here? Would just using the
resulting table name `JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME` work here?
Seems like a call to `getTables` would use the table name and not the full
create table DDL with the schema etc in it.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
+ sqlStatement.append(", " + JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
Review comment:
Question: Is this an expected occurrence or just being cautious?
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -116,6 +119,19 @@ private void initializeCatalogTables() throws
InterruptedException, SQLException
LOG.debug("Creating table {} to store iceberg catalog",
JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
});
+
+ connections.run(conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists = dbMeta.getTables(null, null,
JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE, null);
+
+ if (tableExists.next()) {
+ return true;
Review comment:
Nit: It's strange to see a `ResultSet` or any kind of iterator with a
boolean name. Due to my next comment, I'm not 100% sure about a better
alternative, but maybe `namespacePropertiesTableListing` or something might be
appropriate?
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
Review comment:
Nit: This took me quite some time to read and realize what was taking
place here. Possibly it could be cleaned up a bit to help readers?
The `INSERT_NAMESPACE_PROPERTIES_SQL = `INSERT INTO
iceberg_namespace_properties(catalog_name , namespace, key, value) VALUES
(?,?,?,?)`.
And then the code iterates over `properties.size() - 1`, appending `,
(?,?,?,?)`.
So a few concerns:
1) What happens if this function is called when `properties` is empty? Is
that possible?
2) For readability purposes, would it make sense to possibly append first
the INSERT INTO statement, then use a google comon's library JOINER of `,` or a
repeater to join properties.size() count of placeholders?
I had to spend quite some time verifying the correctness of this and I'm
still not entirely sure (though I assume it's not going to be called with an
empty `properties` value.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
+ sqlStatement.append(", " + JdbcUtil.INSERT_PROPERTIES_VALUES_BASE);
+ }
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+ sql.setString(rowIndex + 3, keyValue.getKey());
+ sql.setString(rowIndex + 4, keyValue.getValue());
+ rowIndex += 4;
+ }
+ return sql.executeUpdate();
+ }
+ });
+
+ if (insertedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to insertProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to insertProperties(namespace, properties)
Namespace: %s", namespace);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to insertProperties to
namespace: %s in catalog: %s", namespace,
+ catalogName);
+ }
+ }
+
+ private boolean updateProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int updatedRecords = connections.run(conn -> {
+ StringBuilder valueCases = new StringBuilder();
+ for (int i = 0; i < properties.size(); i++) {
+ valueCases.append(" \n " + JdbcUtil.UPDATE_PROPERTIES_VALUES_BASE);
+ }
+
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.UPDATE_NAMESPACE_PROPERTIES_SQL.replace("{}",
+ valueCases.toString()));
+
+ String values = String.join(",",
Collections.nCopies(properties.size(), String.valueOf('?')));
+ sqlStatement.append("(").append(values).append(")");
+
+ try (PreparedStatement sql =
conn.prepareStatement(sqlStatement.toString())) {
+ int rowIndex = 0;
+ for (Map.Entry<String, String> keyValue : properties.entrySet()) {
+ sql.setString(rowIndex + 1, keyValue.getKey());
+ sql.setString(rowIndex + 2, keyValue.getValue());
+ rowIndex += 2;
+ }
+ sql.setString(rowIndex + 1, catalogName);
+ sql.setString(rowIndex + 2, namespaceName);
+
+ rowIndex += 2;
+ for (String key : properties.keySet()) {
+ sql.setString(rowIndex + 1, key);
+ rowIndex += 1;
+ }
+
+ return sql.executeUpdate();
+ }
+ });
+
+ if (updatedRecords == properties.size()) {
+ LOG.debug("Successfully committed to new namespace: {}",
namespaceName);
+ return true;
+ } else {
+ throw new CommitFailedException("Failed to updateProperties to
namespace %s in catalog %s", namespaceName,
+ catalogName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e,
+ "Interrupted in call to updateProperties(namespace, properties)
Namespace: %s", namespace);
+ } catch (SQLException e) {
+ throw new UncheckedSQLException(e, "Failed to updateProperties to
namespace: %s in catalog: %s", namespace,
+ catalogName);
+ }
+ }
+
@Override
public void createNamespace(Namespace namespace, Map<String, String>
metadata) {
- throw new UnsupportedOperationException("Cannot create namespace " +
namespace +
- ": createNamespace is not supported");
+ if (namespaceExists(namespace)) {
+ throw new AlreadyExistsException("Namespace already exists: %s",
namespace);
+ }
+
+ if (metadata == null || metadata.isEmpty()) {
+ throw new IllegalArgumentException("Cannot create a namespace with null
or empty metadata");
+ }
Review comment:
Question: Are we sure that `metadata` can't be empty?
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -250,10 +266,111 @@ public void setConf(Configuration conf) {
this.conf = conf;
}
+ private boolean insertProperties(Namespace namespace, Map<String, String>
properties) {
+ String namespaceName = JdbcUtil.namespaceToString(namespace);
+
+ try {
+ int insertedRecords = connections.run(conn -> {
+ StringBuilder sqlStatement = new
StringBuilder(JdbcUtil.INSERT_NAMESPACE_PROPERTIES_SQL);
+
+ for (int i = 0; i < properties.size() - 1; i++) {
Review comment:
Yeah completely understand that formatting Strings like this is not as
straightforward in vanilla Java.
Check out the joiner. One simpler approach might be pushing a collection of
Strings of `(?,?,?,?)` for each property pair and then using the
`org.apache.iceberg.relocated.com.google.common.base.Joiner` on that collection
to get the final
It's definitely not as convenient as Scala's `.mkString(",")` etc.
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -116,6 +119,19 @@ private void initializeCatalogTables() throws
InterruptedException, SQLException
LOG.debug("Creating table {} to store iceberg catalog",
JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
});
+
+ connections.run(conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists = dbMeta.getTables(null, null,
JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE, null);
+
+ if (tableExists.next()) {
+ return true;
Review comment:
So what you referenced in your response to me seems correct.
But the code isn't using `GET_NAMESPACE_SQL` in that `getTables` call. It's
using `CREATE_NAMESPACE_PROPERTIES_SQL`, which is copied at the bottom.
I think you just need to update this line
https://github.com/apache/iceberg/pull/3275/files#diff-a5f7966c7493d463facbf7e76fe0fa3db1b637d29ee12e5b3628a5e5dd289e0aR125
to use `GET_NAMESPACE_SQL` like you mentioned.
```java
protected static final String CREATE_NAMESPACE_PROPERTIES_TABLE =
"CREATE TABLE " + NAMESPACE_PROPERTIES_TABLE_NAME +
"(" +
CATALOG_NAME + " VARCHAR(255) NOT NULL," +
NAMESPACE_NAME + " VARCHAR(255) NOT NULL," +
NAMESPACE_PROPERTY_KEY + " VARCHAR(5500)," +
NAMESPACE_PROPERTY_VALUE + " VARCHAR(5500)," +
"PRIMARY KEY (" + CATALOG_NAME + ", " + NAMESPACE_NAME + ", " +
NAMESPACE_PROPERTY_KEY + ")" +
")";
```
##########
File path: core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
##########
@@ -116,6 +119,19 @@ private void initializeCatalogTables() throws
InterruptedException, SQLException
LOG.debug("Creating table {} to store iceberg catalog",
JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
});
+
+ connections.run(conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet tableExists = dbMeta.getTables(null, null,
JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE, null);
+
+ if (tableExists.next()) {
+ return true;
Review comment:
So what you referenced in your response to me seems correct.
But the code isn't using `GET_NAMESPACE_SQL` in that `getTables` call. It's
using `CREATE_NAMESPACE_PROPERTIES_SQL`, which is copied at the bottom.
I think you just need to update this line
https://github.com/apache/iceberg/pull/3275/files#diff-a5f7966c7493d463facbf7e76fe0fa3db1b637d29ee12e5b3628a5e5dd289e0aR125
(line 125 above) to use `GET_NAMESPACE_SQL` like you mentioned.
```java
protected static final String CREATE_NAMESPACE_PROPERTIES_TABLE =
"CREATE TABLE " + NAMESPACE_PROPERTIES_TABLE_NAME +
"(" +
CATALOG_NAME + " VARCHAR(255) NOT NULL," +
NAMESPACE_NAME + " VARCHAR(255) NOT NULL," +
NAMESPACE_PROPERTY_KEY + " VARCHAR(5500)," +
NAMESPACE_PROPERTY_VALUE + " VARCHAR(5500)," +
"PRIMARY KEY (" + CATALOG_NAME + ", " + NAMESPACE_NAME + ", " +
NAMESPACE_PROPERTY_KEY + ")" +
")";
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]