singhpk234 commented on code in PR #1287: URL: https://github.com/apache/polaris/pull/1287#discussion_r2038629351
########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/models/ModelEntity.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc.models; + +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; + +public class ModelEntity { + // the id of the catalog associated to that entity. NULL_ID if this entity is top-level like Review Comment: The thing is its not NULL actually its 0, updated the comment. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.*; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Objects; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatasourceOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class); + DataSource datasource; + + public DatasourceOperations(DataSource datasource) { + this.datasource = datasource; + } + + public void executeScript() { + ClassLoader classLoader = DatasourceOperations.class.getClassLoader(); + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + BufferedReader reader = + new BufferedReader( + new InputStreamReader( + Objects.requireNonNull(classLoader.getResourceAsStream("h2/schema-v1-h2.sql")), + UTF_8)); + StringBuilder sqlBuffer = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty() && !line.startsWith("--")) { // Ignore empty lines and comments + sqlBuffer.append(line).append("\n"); + if (line.endsWith(";")) { // Execute statement when semicolon is found + String sql = sqlBuffer.toString().trim(); + try { + int rowsUpdated = statement.executeUpdate(sql); + LOGGER.debug("Query {} executed {} rows affected", sql, rowsUpdated); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", sql, e); + } + sqlBuffer.setLength(0); // Clear the buffer for the next statement + } + } + } + } catch (IOException e) { + LOGGER.error("Error reading the script file", e); + throw new RuntimeException(e); + } catch (SQLException e) { + LOGGER.error("Error executing the script file", e); + throw new RuntimeException(e); + } + } + + public <T> List<T> executeSelect(String query, Class<T> targetClass) { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + ResultSet s = statement.executeQuery(query); + List<T> results = ResultSetToObjectConverter.convert(s, targetClass); + return results.isEmpty() ? null : results; + } catch (Exception e) { + LOGGER.error("Error executing query {}", query, e); + throw new RuntimeException(e); + } + } + + public int executeUpdate(String query) { + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + return statement.executeUpdate(query); + } catch (SQLException e) { + LOGGER.error("Error executing query {}", query, e); + return -1; + } + } + + public int executeUpdate(String query, Statement statement) throws SQLException { Review Comment: need this handle exception handling, to wrap AlreadyExists vs Retry on concurrent update ########## extension/persistence/relational-jdbc/build.gradle.kts: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { id("polaris-server") } + +dependencies { + implementation(project(":polaris-core")) + implementation(libs.slf4j.api) + + implementation(platform(libs.quarkus.bom)) + compileOnly("io.smallrye.common:smallrye-common-annotation") // @Identifier + compileOnly("io.smallrye.config:smallrye-config-core") // @ConfigMapping Review Comment: oops forgot to remove, corrected it. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/PolarisJdbcBasePersistenceImpl.java: ########## @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.util.*; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.entity.*; +import org.apache.polaris.core.persistence.*; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.PolarisStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelEntity; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelGrantRecord; +import org.apache.polaris.extension.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData; + +public class PolarisJdbcBasePersistenceImpl implements BasePersistence, IntegrationPersistence { + + private final DatasourceOperations datasourceOperations; + private final PrincipalSecretsGenerator secretsGenerator; + private final PolarisStorageIntegrationProvider storageIntegrationProvider; + + public PolarisJdbcBasePersistenceImpl( + DatasourceOperations databaseOperations, + PrincipalSecretsGenerator secretsGenerator, + PolarisStorageIntegrationProvider storageIntegrationProvider) { + this.datasourceOperations = databaseOperations; + this.secretsGenerator = secretsGenerator; + this.storageIntegrationProvider = storageIntegrationProvider; + } + + @Override + public long generateNewId(PolarisCallContext callCtx) { + return IdGenerator.getInstance().nextId(); + } + + @Override + public void writeEntity( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisBaseEntity entity, + boolean nameOrParentChanged, + PolarisBaseEntity originalEntity) { + ModelEntity modelEntity = ModelEntity.fromEntity(entity); + String query; + if (originalEntity == null) { + query = JdbcCrudQueryGenerator.generateInsertQuery(modelEntity, ModelEntity.class); + } else { + Map<String, Object> params = new HashMap<>(); + params.put("id", originalEntity.getId()); + params.put("catalog_id", originalEntity.getCatalogId()); + params.put("entity_version", originalEntity.getEntityVersion()); + query = JdbcCrudQueryGenerator.generateUpdateQuery(modelEntity, params, ModelEntity.class); + } + int rowsUpdated = datasourceOperations.executeUpdate(query); + if (rowsUpdated == 0) { + if (originalEntity == null) { + // bad interface. + throw new EntityAlreadyExistsException(entity); + } else { + throw new RetryOnConcurrencyException("CAS failed"); + } + } + } + + @Override + public void writeEntities( + PolarisCallContext callCtx, + List<PolarisBaseEntity> entities, + List<PolarisBaseEntity> originalEntities) { + try { + datasourceOperations.runWithinTransaction( + statement -> { + for (int i = 0; i < entities.size(); i++) { + PolarisBaseEntity entity = entities.get(i); + ModelEntity modelEntity = ModelEntity.fromEntity(entity); + + // first, check if the entity has already been created, in which case we will simply + // return it. + PolarisBaseEntity entityFound = + lookupEntity( + callCtx, entity.getCatalogId(), entity.getId(), entity.getTypeCode()); + if (entityFound != null) { + // probably the client retried, simply return it + // TODO: Check correctness of returning entityFound vs entity here. It may have + // already + // been updated after the creation. + continue; + } + // lookup by name + EntityNameLookupRecord exists = + lookupEntityIdAndSubTypeByName( + callCtx, + entity.getCatalogId(), + entity.getParentId(), + entity.getTypeCode(), + entity.getName()); + if (exists != null) { + throw new EntityAlreadyExistsException(entity); + } + String query; + if (originalEntities == null || originalEntities.get(i) == null) { + query = JdbcCrudQueryGenerator.generateInsertQuery(modelEntity, ModelEntity.class); + } else { + // CAS + Map<String, Object> params = new HashMap<>(); + params.put("id", originalEntities.get(i).getId()); + params.put("catalog_id", originalEntities.get(i).getCatalogId()); + params.put("entity_version", originalEntities.get(i).getEntityVersion()); + query = + JdbcCrudQueryGenerator.generateUpdateQuery( + modelEntity, params, ModelEntity.class); + } + int rowsUpdated = datasourceOperations.executeUpdate(query, statement); + if (rowsUpdated == 0) { + if (originalEntities == null || originalEntities.get(i) == null) { + // bad interface. + throw new EntityAlreadyExistsException(entity); + } else { + throw new RetryOnConcurrencyException("CAS failed"); + } + } + } + return true; + }); + } catch (Exception e) { + if (e instanceof EntityAlreadyExistsException) { + throw (EntityAlreadyExistsException) e; + } Review Comment: Update the handling ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/models/ModelEntity.java: ########## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc.models; + +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.PolarisEntityType; + +public class ModelEntity { + // the id of the catalog associated to that entity. NULL_ID if this entity is top-level like + // a catalog + private long catalogId; + + // the id of the entity which was resolved + private long id; + + // the id of the parent of this entity, use 0 for a top-level entity whose parent is the account + private long parentId; + + // the type of the entity when it was resolved + private int typeCode; + + // the name that this entity had when it was resolved + private String name; + + // the version that this entity had when it was resolved + private int entityVersion; + + // the type of the entity when it was resolved + private int subTypeCode; + + // timestamp when this entity was created + private long createTimestamp; + + // when this entity was dropped. Null if was never dropped + private long dropTimestamp; + + // when did we start purging this entity. When not null, un-drop is no longer possible + private long purgeTimestamp; + + // when should we start purging this entity + private long toPurgeTimestamp; + + // last time this entity was updated, only for troubleshooting + private long lastUpdateTimestamp; + + // properties, serialized as a JSON string + private String properties; + + // internal properties, serialized as a JSON string + private String internalProperties; + + // current version for that entity, will be monotonically incremented + private int grantRecordsVersion; + + public long getId() { + return id; + } + + public long getParentId() { + return parentId; + } + + public int getTypeCode() { + return typeCode; + } + + public String getName() { + return name; + } + + public int getEntityVersion() { + return entityVersion; + } + + public long getCatalogId() { + return catalogId; + } + + public int getSubTypeCode() { + return subTypeCode; + } + + public long getCreateTimestamp() { + return createTimestamp; + } + + public long getDropTimestamp() { + return dropTimestamp; + } + + public long getPurgeTimestamp() { + return purgeTimestamp; + } + + public long getToPurgeTimestamp() { + return toPurgeTimestamp; + } + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + public String getProperties() { + return properties != null ? properties : "{}"; + } + + public String getInternalProperties() { + return internalProperties != null ? internalProperties : "{}"; + } + + public int getGrantRecordsVersion() { + return grantRecordsVersion; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private final ModelEntity entity; + + private Builder() { + entity = new ModelEntity(); + } + + public Builder catalogId(long catalogId) { + entity.catalogId = catalogId; + return this; + } + + public Builder id(long id) { + entity.id = id; + return this; + } + + public Builder parentId(long parentId) { + entity.parentId = parentId; + return this; + } + + public Builder typeCode(int typeCode) { + entity.typeCode = typeCode; + return this; + } + + public Builder name(String name) { + entity.name = name; + return this; + } + + public Builder entityVersion(int entityVersion) { + entity.entityVersion = entityVersion; + return this; + } + + public Builder subTypeCode(int subTypeCode) { + entity.subTypeCode = subTypeCode; + return this; + } + + public Builder createTimestamp(long createTimestamp) { + entity.createTimestamp = createTimestamp; + return this; + } + + public Builder dropTimestamp(long dropTimestamp) { + entity.dropTimestamp = dropTimestamp; + return this; + } + + public Builder purgeTimestamp(long purgeTimestamp) { + entity.purgeTimestamp = purgeTimestamp; + return this; + } + + public Builder toPurgeTimestamp(long toPurgeTimestamp) { + entity.toPurgeTimestamp = toPurgeTimestamp; + return this; + } + + public Builder lastUpdateTimestamp(long lastUpdateTimestamp) { + entity.lastUpdateTimestamp = lastUpdateTimestamp; + return this; + } + + public Builder properties(String properties) { + entity.properties = properties; + return this; + } + + public Builder internalProperties(String internalProperties) { + entity.internalProperties = internalProperties; + return this; + } + + public Builder grantRecordsVersion(int grantRecordsVersion) { + entity.grantRecordsVersion = grantRecordsVersion; + return this; + } + + public ModelEntity build() { + return entity; + } + } + + public static ModelEntity fromEntity(PolarisBaseEntity entity) { + return ModelEntity.builder() + .catalogId(entity.getCatalogId()) + .id(entity.getId()) + .parentId(entity.getParentId()) + .typeCode(entity.getTypeCode()) + .name(entity.getName()) + .entityVersion(entity.getEntityVersion()) + .subTypeCode(entity.getSubTypeCode()) + .createTimestamp(entity.getCreateTimestamp()) + .dropTimestamp(entity.getDropTimestamp()) + .purgeTimestamp(entity.getPurgeTimestamp()) + .toPurgeTimestamp(entity.getToPurgeTimestamp()) + .lastUpdateTimestamp(entity.getLastUpdateTimestamp()) + .properties(entity.getProperties()) + .internalProperties(entity.getInternalProperties()) + .grantRecordsVersion(entity.getGrantRecordsVersion()) + .build(); + } + + public static PolarisBaseEntity toEntity(ModelEntity model) { + if (model == null) { + return null; + } + + var entity = + new PolarisBaseEntity( + model.getCatalogId(), + model.getId(), + PolarisEntityType.fromCode(model.getTypeCode()), + PolarisEntitySubType.fromCode(model.getSubTypeCode()), Review Comment: Added IllegalArgument exception. ########## extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extension.persistence.relational.jdbc; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.*; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Objects; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatasourceOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(DatasourceOperations.class); + DataSource datasource; + + public DatasourceOperations(DataSource datasource) { + this.datasource = datasource; + } + + public void executeScript() { + ClassLoader classLoader = DatasourceOperations.class.getClassLoader(); + try (Connection connection = borrowConnection(); + Statement statement = connection.createStatement()) { + BufferedReader reader = + new BufferedReader( + new InputStreamReader( + Objects.requireNonNull(classLoader.getResourceAsStream("h2/schema-v1-h2.sql")), + UTF_8)); + StringBuilder sqlBuffer = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty() && !line.startsWith("--")) { // Ignore empty lines and comments + sqlBuffer.append(line).append("\n"); + if (line.endsWith(";")) { // Execute statement when semicolon is found + String sql = sqlBuffer.toString().trim(); Review Comment: added just as a safety check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org