This is an automated email from the ASF dual-hosted git repository. machristie pushed a commit to branch calcite in repository https://gitbox.apache.org/repos/asf/airavata-data-catalog.git
commit 8a148e803559c5164dddf71201b7356b0db6bac7 Author: Marcus Christie <[email protected]> AuthorDate: Tue Mar 21 10:53:49 2023 -0400 Implement basic data product search over metadata schemas --- .../api/client/DataCatalogAPIClient.java | 47 ++++- data-catalog-api/server/pom.xml | 5 + .../datacatalog/api/query/MetadataQueryParser.java | 147 -------------- .../api/query/MetadataSchemaQueryExecutor.java | 6 + .../api/query/MetadataSchemaQueryResult.java | 9 + .../api/query/MetadataSchemaQueryWriter.java | 21 ++ .../impl/MetadataSchemaQueryExecutorImpl.java | 214 +++++++++++++++++++++ .../PostgresqlMetadataSchemaQueryWriterImpl.java | 201 +++++++++++++++++++ .../api/service/DataCatalogAPIService.java | 39 ++-- .../api/service/DataCatalogService.java | 3 + .../api/service/impl/DataCatalogServiceImpl.java | 34 ++-- .../server/src/main/resources/logback.xml | 4 + .../stubs/src/main/proto/DataCatalogAPI.proto | 42 ++-- 13 files changed, 582 insertions(+), 190 deletions(-) diff --git a/data-catalog-api/client/src/main/java/org/apache/airavata/datacatalog/api/client/DataCatalogAPIClient.java b/data-catalog-api/client/src/main/java/org/apache/airavata/datacatalog/api/client/DataCatalogAPIClient.java index f01554e..794147b 100644 --- a/data-catalog-api/client/src/main/java/org/apache/airavata/datacatalog/api/client/DataCatalogAPIClient.java +++ b/data-catalog-api/client/src/main/java/org/apache/airavata/datacatalog/api/client/DataCatalogAPIClient.java @@ -16,6 +16,8 @@ import org.apache.airavata.datacatalog.api.DataProductGetRequest; import org.apache.airavata.datacatalog.api.DataProductGetResponse; import org.apache.airavata.datacatalog.api.DataProductRemoveFromMetadataSchemaRequest; import org.apache.airavata.datacatalog.api.DataProductRemoveFromMetadataSchemaResponse; +import org.apache.airavata.datacatalog.api.DataProductSearchRequest; +import org.apache.airavata.datacatalog.api.DataProductSearchResponse; import org.apache.airavata.datacatalog.api.DataProductUpdateRequest; import org.apache.airavata.datacatalog.api.DataProductUpdateResponse; import org.apache.airavata.datacatalog.api.FieldValueType; @@ -133,6 +135,12 @@ public class DataCatalogAPIClient { return response.getDataProduct(); } + public List<DataProduct> searchDataProducts(String sql) { + DataProductSearchRequest request = DataProductSearchRequest.newBuilder().setSql(sql).build(); + DataProductSearchResponse response = blockingStub.searchDataProducts(request); + return response.getDataProductsList(); + } + public static void main(String[] args) throws InterruptedException { String target = "localhost:6565"; @@ -211,6 +219,21 @@ public class DataCatalogAPIClient { field2.getFieldName(), field2.getSchemaName())); } + MetadataSchemaField field3 = MetadataSchemaField.newBuilder().setFieldName("field3") + .setJsonPath("$.field3").setValueType(FieldValueType.STRING) + .setSchemaName(metadataSchema.getSchemaName()).build(); + MetadataSchemaField field3Exists = client.getMetadataSchemaField(field3.getSchemaName(), + field3.getFieldName()); + if (field3Exists == null) { + field3 = client.createMetadataSchemaField(field3); + System.out.println(MessageFormat.format("Created metadata schema field [{0}] in schema [{1}]", + field3.getFieldName(), field3.getSchemaName())); + } else { + field3 = field3Exists; + System.out.println(MessageFormat.format("Found metadata schema field [{0}] in schema [{1}]", + field3.getFieldName(), field3.getSchemaName())); + } + List<MetadataSchemaField> fields = client.getMetadataSchemaFields(metadataSchema.getSchemaName()); System.out.println(MessageFormat.format("Found {0} fields for schema {1}", fields.size(), metadataSchema.getSchemaName())); @@ -230,7 +253,7 @@ public class DataCatalogAPIClient { // Create data product that belongs to my_schema schema DataProduct dataProduct3 = DataProduct.newBuilder() .setName("testing 3") - .setMetadata("{\"foo\": \"bar\"}") + .setMetadata("{\"field3\": \"bar\", \"field1\": 10}") .addMetadataSchemas("my_schema") .build(); DataProduct result3 = client.createDataProduct(dataProduct3); @@ -238,6 +261,28 @@ public class DataCatalogAPIClient { MessageFormat.format("Created third data product [{0}], supporting schemas [{1}]", result3.getDataProductId(), result3.getMetadataSchemasList())); + // Create another data product that belongs to my_schema schema, but with + // different "field3" and "field1" values + DataProduct dataProduct4 = DataProduct.newBuilder() + .setName("testing 4") + .setMetadata("{\"field3\": \"baz\", \"field1\": 2}") + .addMetadataSchemas("my_schema") + .build(); + client.createDataProduct(dataProduct4); + + List<DataProduct> searchResults = client.searchDataProducts(""" + select * from my_schema where field3 = 'bar' + """); + System.out.println(searchResults); + + searchResults = client.searchDataProducts(""" + select * from my_schema where (field1 < 5 or field3 = 'bar') and field1 > 0 + and external_id = 'fff' + """); + // searchResults = client.searchDataProducts(""" + // select * from my_schema where not (field1 < 5 or field3 = 'bar') + // """); + System.out.println(searchResults); } finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } diff --git a/data-catalog-api/server/pom.xml b/data-catalog-api/server/pom.xml index cdaf834..e2318e9 100644 --- a/data-catalog-api/server/pom.xml +++ b/data-catalog-api/server/pom.xml @@ -43,6 +43,11 @@ <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-devtools</artifactId> + <optional>true</optional> + </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataQueryParser.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataQueryParser.java deleted file mode 100644 index 995497e..0000000 --- a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataQueryParser.java +++ /dev/null @@ -1,147 +0,0 @@ -package org.apache.airavata.datacatalog.api.query; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Arrays; -import java.util.Collections; - -import javax.sql.DataSource; - -import org.apache.calcite.adapter.jdbc.JdbcSchema; -import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.RelRoot; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.impl.ViewTable; -import org.apache.calcite.sql.SqlExplainFormat; -import org.apache.calcite.sql.SqlExplainLevel; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.dialect.PostgresqlSqlDialect; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.tools.Planner; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.RelRunner; -import org.apache.calcite.tools.ValidationException; - -public class MetadataQueryParser { - private static final String POSTGRESQL_SCHEMA = "PUBLIC"; - - public static void main(String[] args) - throws SqlParseException, SQLException, ValidationException, RelConversionException { - // SqlParser parser = SqlParser.create( - // "SELECT * FROM smilesdb WHERE created_date > '2020-01-01' AND absorb < 300.0 - // ORDER BY created_date desc LIMIT 10"); - // SqlNode query = parser.parseQuery(); - // System.out.println(query.toSqlString(PostgresqlSqlDialect.DEFAULT)); - - // Properties props = new Properties(); - // props.setProperty("currentSchema", "data_catalog"); - // props.setProperty("password", "secret"); - // props.setProperty("ssl", "true"); - // Connection connection = DriverManager.getConnection("jdbc:calcite:", props); - Connection connection = DriverManager.getConnection("jdbc:calcite:"); - // Set the default schema for the connection - // connection.setSchema("data_catalog"); - // Unwrap our connection using the CalciteConnection - CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); - // calciteConnection.setSchema("data_catalog"); - - // Get a pointer to our root schema for our Calcite Connection - SchemaPlus rootSchema = calciteConnection.getRootSchema(); - // SchemaPlus rootSchema = Frameworks.createRootSchema(false); - - // Instantiate a data source, this can be autowired in using Spring as well - DataSource postgresDataSource = JdbcSchema.dataSource( - "jdbc:postgresql://localhost/data_catalog", - "org.postgresql.Driver", // Change this if you want to use something like MySQL, Oracle, - // etc. - "postgres", // username - "example" // password - ); - // JdbcSchema jdbcSchema = JdbcSchema.create(rootSchema, POSTGRESQL_SCHEMA, - // postgresDataSource, null, null); - // System.out.println("tables: " + jdbcSchema.getTableNames()); - // rootSchema.add(POSTGRESQL_SCHEMA, - // JdbcSchema.create(rootSchema, POSTGRESQL_SCHEMA, postgresDataSource, null, - // null)); - // rootSchema.add("data_product", jdbcSchema.getTable("data_product")); - SchemaPlus dcSchema = rootSchema.add("data_catalog", - JdbcSchema.create(rootSchema, "data_catalog", postgresDataSource, null, null)); - dcSchema.add("smilesdb", ViewTable.viewMacro(dcSchema, - "select data_product_id, parent_data_product_id, external_id, metadata, name from \"data_catalog\".\"data_product\" where jsonb_path_exists(\"metadata\", '$.foo == \"bar\"')", - Collections.emptyList(), Arrays.asList("data_catalog", "smilesdb"), false)); - - System.out.println("subschemas: " + rootSchema.getSubSchemaNames()); - - FrameworkConfig config = Frameworks.newConfigBuilder() - .defaultSchema(rootSchema) - .parserConfig(PostgresqlSqlDialect.DEFAULT.configureParser(SqlParser.config())) - // .parserConfig(SqlParser.Config.DEFAULT.withCaseSensitive(false).withQuoting(Quoting.)) - .build(); - // String query = "SELECT * FROM public.\"data_product\" WHERE - // \"data_product_id\" = 1"; - // String query = "SELECT * FROM data_catalog.data_product WHERE data_product_id - // = 1"; - // String query = "SELECT * FROM data_product WHERE data_product_id = 1"; - // "smilesdb" view macro - String query = "SELECT * FROM data_catalog.smilesdb WHERE data_product_id = 1"; - Planner planner = Frameworks.getPlanner(config); - SqlNode parse = planner.parse(query); - - // Change unqualified table referenced into qualified - // final SqlNode transformedNode = parse.accept( - // new SqlShuttle() { - // @Override - // public SqlNode visit(SqlIdentifier id) { - // System.out.println("id: " + id); - // System.out.println("id.names: " + id.names); - // if (id.names.equals(Collections.singletonList("data_product"))) { - // return new SqlIdentifier(Arrays.asList("data_catalog", "data_product"), - // id.getParserPosition()); - // } - // return id; - // } - // }); - System.out.println(parse.toSqlString(PostgresqlSqlDialect.DEFAULT)); - // System.out.println(parse.toString()); - - SqlNode validate = planner.validate(parse); - RelRoot relRoot = planner.rel(validate); - // RelNode rel = relRoot.project(); - RelNode rel = relRoot.rel; - System.out.println(RelOptUtil.dumpPlan("", rel, SqlExplainFormat.TEXT, SqlExplainLevel.EXPPLAN_ATTRIBUTES)); - - final RelRunner runner = connection.unwrap(RelRunner.class); - PreparedStatement ps = runner.prepareStatement(rel); - - ps.executeQuery(); - - ResultSet resultSet = ps.getResultSet(); - - final StringBuilder buf = new StringBuilder(); - - while (resultSet.next()) { - - int columnCount = resultSet.getMetaData().getColumnCount(); - - for (int i = 1; i <= columnCount; i++) { - - buf.append(i > 1 ? "; " : "") - .append(resultSet.getMetaData().getColumnLabel(i)) - .append("=") - .append(resultSet.getObject(i)); - } - - System.out.println("Entry: " + buf.toString()); - - buf.setLength(0); - } - } -} diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryExecutor.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryExecutor.java new file mode 100644 index 0000000..6871553 --- /dev/null +++ b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryExecutor.java @@ -0,0 +1,6 @@ +package org.apache.airavata.datacatalog.api.query; + +public interface MetadataSchemaQueryExecutor { + + MetadataSchemaQueryResult execute(String sql); +} diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryResult.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryResult.java new file mode 100644 index 0000000..9d2c0f6 --- /dev/null +++ b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryResult.java @@ -0,0 +1,9 @@ +package org.apache.airavata.datacatalog.api.query; + +import java.util.List; + +import org.apache.airavata.datacatalog.api.DataProduct; + +public record MetadataSchemaQueryResult(List<DataProduct> dataProducts) { + +} diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryWriter.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryWriter.java new file mode 100644 index 0000000..08ab745 --- /dev/null +++ b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/MetadataSchemaQueryWriter.java @@ -0,0 +1,21 @@ +package org.apache.airavata.datacatalog.api.query; + +import java.util.Collection; +import java.util.Map; + +import org.apache.airavata.datacatalog.api.model.MetadataSchemaEntity; +import org.apache.calcite.sql.SqlNode; + +public interface MetadataSchemaQueryWriter { + + /** + * Rewrite the query as needed to filter against metadata schema fields. + * + * @param sqlNode + * @param metadataSchemas + * @param tableAliases + * @return + */ + String rewriteQuery(SqlNode sqlNode, Collection<MetadataSchemaEntity> metadataSchemas, + Map<String, String> tableAliases); +} diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/impl/MetadataSchemaQueryExecutorImpl.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/impl/MetadataSchemaQueryExecutorImpl.java new file mode 100644 index 0000000..ab1202d --- /dev/null +++ b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/impl/MetadataSchemaQueryExecutorImpl.java @@ -0,0 +1,214 @@ +package org.apache.airavata.datacatalog.api.query.impl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.airavata.datacatalog.api.DataProduct; +import org.apache.airavata.datacatalog.api.FieldValueType; +import org.apache.airavata.datacatalog.api.mapper.DataProductMapper; +import org.apache.airavata.datacatalog.api.model.DataProductEntity; +import org.apache.airavata.datacatalog.api.model.MetadataSchemaEntity; +import org.apache.airavata.datacatalog.api.model.MetadataSchemaFieldEntity; +import org.apache.airavata.datacatalog.api.query.MetadataSchemaQueryExecutor; +import org.apache.airavata.datacatalog.api.query.MetadataSchemaQueryResult; +import org.apache.airavata.datacatalog.api.query.MetadataSchemaQueryWriter; +import org.apache.airavata.datacatalog.api.repository.MetadataSchemaRepository; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactory.Builder; +import org.apache.calcite.runtime.CalciteContextException; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.util.SqlShuttle; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import jakarta.persistence.EntityManager; + +@Component +public class MetadataSchemaQueryExecutorImpl implements MetadataSchemaQueryExecutor { + + private static final Logger logger = LoggerFactory.getLogger(MetadataSchemaQueryExecutorImpl.class); + + @Autowired + MetadataSchemaRepository metadataSchemaRepository; + + @Autowired + MetadataSchemaQueryWriter metadataSchemaQueryWriter; + + @Autowired + EntityManager entityManager; + + @Autowired + DataProductMapper dataProductMapper; + + @Override + public MetadataSchemaQueryResult execute(String sql) { + + // Create a schema that contains the data_product table and all of the metadata + // schemas + SchemaPlus schema = Frameworks.createRootSchema(true); + schema.add("data_product", new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + Builder builder = (Builder) typeFactory.builder(); + return builder + .add("data_product_id", SqlTypeName.INTEGER) + .add("parent_data_product_id", SqlTypeName.INTEGER) + .add("external_id", SqlTypeName.VARCHAR) + .add("metadata", SqlTypeName.OTHER) + .build(); + } + }); + + // TODO: limit by tenant id + List<MetadataSchemaEntity> metadataSchemas = metadataSchemaRepository.findAll(); + for (MetadataSchemaEntity metadataSchema : metadataSchemas) { + + schema.add(metadataSchema.getSchemaName(), new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + Builder builder = (Builder) typeFactory.builder(); + + // Add all of the common fields + builder.add("data_product_id", SqlTypeName.INTEGER) + .add("parent_data_product_id", SqlTypeName.INTEGER) + .add("external_id", SqlTypeName.VARCHAR) + .add("metadata", SqlTypeName.OTHER); + + // Add all of the schema specific metadata fields + for (MetadataSchemaFieldEntity metadataSchemaField : metadataSchema.getMetadataSchemaFields()) { + builder.add(metadataSchemaField.getFieldName(), + getSqlTypeName(metadataSchemaField.getFieldValueType())); + } + + return builder.build(); + } + }); + } + + FrameworkConfig config = Frameworks.newConfigBuilder() + .defaultSchema(schema) + .parserConfig(SqlParser.Config.DEFAULT.withUnquotedCasing(Casing.TO_LOWER)) + .build(); + Planner planner = Frameworks.getPlanner(config); + + SqlNode sqlNode = parse(planner, sql); + + SqlValidator validator = getValidator(schema, config, planner); + + // Validate the query + SqlNode validatedSqlNode = validate(validator, sqlNode); + + // create a mapping of table aliases to actual tables (metadata schemas) + // For example, if query is of the form "select * from smilesdb as sm", then + // create a mapping from sm -> smilesdb + + // TODO: may not be a SqlSelect, might be an OrderBy for example + SqlSelect selectNode = (SqlSelect) validatedSqlNode; + Map<String, String> tableAliases = new HashMap<>(); + selectNode.getFrom().accept(new SqlShuttle() { + + @Override + public SqlNode visit(SqlCall call) { + if (call.isA(Collections.singleton(SqlKind.AS))) { + + SqlIdentifier first = call.operand(0); + SqlIdentifier second = call.operand(1); + tableAliases.put(second.getSimple(), first.getSimple()); + } + return super.visit(call); + } + + }); + + String finalSql = metadataSchemaQueryWriter.rewriteQuery(validatedSqlNode, metadataSchemas, tableAliases); + logger.debug("Metadata schema query final sql: {}", finalSql); + + List<DataProductEntity> dataProductEntities = entityManager.createNativeQuery(finalSql, DataProductEntity.class) + .getResultList(); + + List<DataProduct> dataProducts = new ArrayList<>(); + for (DataProductEntity dataProductEntity : dataProductEntities) { + + org.apache.airavata.datacatalog.api.DataProduct.Builder dpBuilder = DataProduct.newBuilder(); + dataProductMapper.mapEntityToModel(dataProductEntity, dpBuilder); + dataProducts.add(dpBuilder.build()); + } + + return new MetadataSchemaQueryResult(dataProducts); + } + + private SqlValidator getValidator(SchemaPlus schema, FrameworkConfig config, Planner planner) { + CalciteConnectionConfig connectionConfig = CalciteConnectionConfig.DEFAULT; + CalciteCatalogReader catalogReader = new CalciteCatalogReader(CalciteSchema.from(schema), + CalciteSchema.from(schema).path(null), + planner.getTypeFactory(), connectionConfig); + + SqlValidator validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), + catalogReader, planner.getTypeFactory(), + config.getSqlValidatorConfig().withIdentifierExpansion(false)); + return validator; + } + + SqlNode parse(Planner planner, String sql) { + try { + return planner.parse(sql); + } catch (SqlParseException e) { + // TODO: convert to a checked exception and add to interface + throw new RuntimeException(e); + } + } + + SqlNode validate(SqlValidator validator, SqlNode sqlNode) { + try { + return validator.validate(sqlNode); + } catch (CalciteContextException e) { + // TODO: convert to a checked exception and add to interface + throw new RuntimeException(e); + } + } + + private SqlTypeName getSqlTypeName(FieldValueType fieldValueType) { + + switch (fieldValueType) { + case BOOLEAN: + return SqlTypeName.BOOLEAN; + case FLOAT: + return SqlTypeName.FLOAT; + case INTEGER: + return SqlTypeName.INTEGER; + case STRING: + return SqlTypeName.VARCHAR; + case DATESTRING: + return SqlTypeName.TIMESTAMP; + default: + throw new RuntimeException( + "Unexpected fieldValueType, unable to convert to SqlTypeName: " + fieldValueType); + } + } + +} diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/impl/PostgresqlMetadataSchemaQueryWriterImpl.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/impl/PostgresqlMetadataSchemaQueryWriterImpl.java new file mode 100644 index 0000000..8ace28a --- /dev/null +++ b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/query/impl/PostgresqlMetadataSchemaQueryWriterImpl.java @@ -0,0 +1,201 @@ +package org.apache.airavata.datacatalog.api.query.impl; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.airavata.datacatalog.api.model.MetadataSchemaEntity; +import org.apache.airavata.datacatalog.api.model.MetadataSchemaFieldEntity; +import org.apache.airavata.datacatalog.api.query.MetadataSchemaQueryWriter; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.dialect.PostgresqlSqlDialect; +import org.apache.calcite.sql.util.SqlShuttle; +import org.springframework.stereotype.Component; + +@Component +public class PostgresqlMetadataSchemaQueryWriterImpl implements MetadataSchemaQueryWriter { + + private static final class MetadataSchemaFieldFilterRewriter extends SqlShuttle { + + final Collection<MetadataSchemaEntity> metadataSchemas; + final Map<String, String> tableAliases; + final StringBuilder sql = new StringBuilder(); + // Maintain queue of binary logical operators so we know when to + // open/close parentheses and when to add "AND" and "OR" to the query + Deque<SqlCall> binaryLogicalOperatorNodes = new ArrayDeque<>(); + + MetadataSchemaFieldFilterRewriter(Collection<MetadataSchemaEntity> metadataSchemas, + Map<String, String> tableAliases) { + this.metadataSchemas = metadataSchemas; + this.tableAliases = tableAliases; + } + + MetadataSchemaFieldEntity resolveMetadataSchemaField(SqlIdentifier sqlIdentifier) { + + MetadataSchemaEntity metadataSchema = null; + String fieldName = null; + if (sqlIdentifier.names.size() == 2) { + String tableName = sqlIdentifier.names.get(0); + metadataSchema = resolveMetadataSchema(tableName); + fieldName = sqlIdentifier.names.get(1); + } else if (sqlIdentifier.names.size() == 1) { + // TODO: just pick the first one, but in general we would need + // to look through all of the metadata schemas to find the one + // that this field belongs to + metadataSchema = this.metadataSchemas.iterator().next(); + fieldName = sqlIdentifier.names.get(0); + } else { + throw new RuntimeException("Unexpected sqlIdentifier: " + sqlIdentifier); + } + + for (MetadataSchemaFieldEntity metadataSchemaField : metadataSchema.getMetadataSchemaFields()) { + if (metadataSchemaField.getFieldName().equals(fieldName)) { + return metadataSchemaField; + } + } + // If none matched, must not be a metadata schema field + return null; + } + + MetadataSchemaEntity resolveMetadataSchema(String tableOrAliasName) { + String tableName = tableOrAliasName; + if (this.tableAliases.containsKey(tableOrAliasName)) { + tableName = this.tableAliases.get(tableOrAliasName); + } + return findMetadataSchema(tableName); + } + + MetadataSchemaEntity findMetadataSchema(String schemaName) { + for (MetadataSchemaEntity metadataSchema : this.metadataSchemas) { + if (metadataSchema.getSchemaName().equals(schemaName)) { + return metadataSchema; + } + } + return null; + } + + public String finalizeSql() { + while (!this.binaryLogicalOperatorNodes.isEmpty()) { + this.binaryLogicalOperatorNodes.pop(); + this.sql.append(" ) "); + } + return this.sql.toString(); + } + + @Override + public SqlNode visit(SqlCall call) { + SqlCall currentOperator = this.binaryLogicalOperatorNodes.peek(); + while (currentOperator != null + && !call.getParserPosition().overlaps(currentOperator.getParserPosition())) { + this.binaryLogicalOperatorNodes.remove(); + currentOperator = this.binaryLogicalOperatorNodes.peek(); + this.sql.append(" ) "); + this.sql.append(currentOperator.getOperator().toString()); + this.sql.append(" "); + } + if (call.getKind() == SqlKind.NOT) { + this.sql.append(" NOT "); + } else if (call.getKind() == SqlKind.AND || call.getKind() == SqlKind.OR) { + this.binaryLogicalOperatorNodes.push(call); + this.sql.append("( "); + } else { + SqlNode sqlNode = call.getOperandList().get(0); + // TODO: this assumes that there would only ever be one metadata schema field + // and that it comes first and the second operand is a literal + if (sqlNode.isA(Set.of(SqlKind.IDENTIFIER))) { + SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode; + MetadataSchemaFieldEntity metadataSchemaField = resolveMetadataSchemaField(sqlIdentifier); + if (metadataSchemaField != null) { + // TODO: assuming an alias + sql.append(sqlIdentifier.names.get(0)); + sql.append("."); + sql.append("metadata @@ '"); + sql.append(metadataSchemaField.getJsonPath()); + sql.append(" "); + switch (call.getOperator().kind) { + case EQUALS: + sql.append(" == "); + break; + default: + sql.append(call.getOperator().kind.sql); + break; + } + sql.append(call.getOperandList().get(1).toSqlString(new PostgresqlSqlDialect( + PostgresqlSqlDialect.DEFAULT_CONTEXT.withLiteralQuoteString("\"")))); + sql.append("'"); + } else { + sql.append(call.toSqlString(PostgresqlSqlDialect.DEFAULT)); + } + } + + if (currentOperator != null && !(call.getParserPosition().getEndColumnNum() == currentOperator + .getParserPosition().getEndColumnNum() + && call.getParserPosition().getEndLineNum() == currentOperator.getParserPosition() + .getEndLineNum())) { + sql.append(" "); + sql.append(currentOperator.getOperator().toString()); + sql.append(" "); + } + } + return super.visit(call); + } + } + + @Override + public String rewriteQuery(SqlNode sqlNode, Collection<MetadataSchemaEntity> metadataSchemas, + Map<String, String> tableAliases) { + StringBuilder sb = new StringBuilder(); + + sb.append(writeCommonTableExpressions(metadataSchemas)); + sb.append(" SELECT * FROM "); + sb.append(((SqlSelect) sqlNode).getFrom().toSqlString(PostgresqlSqlDialect.DEFAULT)); + sb.append(" WHERE "); + sb.append(rewriteWhereClauseFilters(sqlNode, metadataSchemas, tableAliases)); + return sb.toString(); + } + + private String rewriteWhereClauseFilters(SqlNode sqlNode, Collection<MetadataSchemaEntity> metadataSchemas, + Map<String, String> tableAliases) { + MetadataSchemaFieldFilterRewriter filterRewriter = new MetadataSchemaFieldFilterRewriter(metadataSchemas, + tableAliases); + sqlNode.accept(filterRewriter); + return filterRewriter.finalizeSql(); + } + + String writeCommonTableExpressions(Collection<MetadataSchemaEntity> metadataSchemas) { + StringBuilder sb = new StringBuilder(); + List<String> commonTableExpressions = new ArrayList<>(); + for (MetadataSchemaEntity metadataSchema : metadataSchemas) { + commonTableExpressions.add(writeCommonTableExpression(metadataSchema)); + } + sb.append("WITH "); + sb.append(String.join(", ", commonTableExpressions)); + return sb.toString(); + } + + String writeCommonTableExpression(MetadataSchemaEntity metadataSchemaEntity) { + + StringBuilder sb = new StringBuilder(); + sb.append(metadataSchemaEntity.getSchemaName()); + sb.append(" AS ("); + sb.append("select dp_.data_product_id, dp_.parent_data_product_id, dp_.external_id, dp_.name, dp_.metadata "); + // for (MetadataSchemaFieldEntity field : + // metadataSchemaEntity.getMetadataSchemaFields()) { + // TODO: include each field as well? + // } + sb.append("from data_product dp_ "); + sb.append("inner join data_product_metadata_schema dpms_ on dpms_.data_product_id = dp_.data_product_id "); + sb.append("inner join metadata_schema ms_ on ms_.metadata_schema_id = dpms_.metadata_schema_id "); + sb.append("where ms_.metadata_schema_id = " + metadataSchemaEntity.getMetadataSchemaId()); + sb.append(")"); + return sb.toString(); + } +} diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/DataCatalogAPIService.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/DataCatalogAPIService.java index b5686a2..1e87293 100644 --- a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/DataCatalogAPIService.java +++ b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/DataCatalogAPIService.java @@ -14,6 +14,8 @@ import org.apache.airavata.datacatalog.api.DataProductGetRequest; import org.apache.airavata.datacatalog.api.DataProductGetResponse; import org.apache.airavata.datacatalog.api.DataProductRemoveFromMetadataSchemaRequest; import org.apache.airavata.datacatalog.api.DataProductRemoveFromMetadataSchemaResponse; +import org.apache.airavata.datacatalog.api.DataProductSearchRequest; +import org.apache.airavata.datacatalog.api.DataProductSearchResponse; import org.apache.airavata.datacatalog.api.DataProductUpdateRequest; import org.apache.airavata.datacatalog.api.DataProductUpdateResponse; import org.apache.airavata.datacatalog.api.MetadataSchema; @@ -35,6 +37,7 @@ import org.apache.airavata.datacatalog.api.MetadataSchemaFieldUpdateResponse; import org.apache.airavata.datacatalog.api.MetadataSchemaGetRequest; import org.apache.airavata.datacatalog.api.MetadataSchemaGetResponse; import org.apache.airavata.datacatalog.api.exception.EntityNotFoundException; +import org.apache.airavata.datacatalog.api.query.MetadataSchemaQueryResult; import org.lognet.springboot.grpc.GRpcService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +108,29 @@ public class DataCatalogAPIService extends DataCatalogAPIServiceGrpc.DataCatalog responseObserver.onCompleted(); } + @Override + public void removeDataProductFromMetadataSchema(DataProductRemoveFromMetadataSchemaRequest request, + StreamObserver<DataProductRemoveFromMetadataSchemaResponse> responseObserver) { + + String dataProductId = request.getDataProductId(); + String schemaName = request.getSchemaName(); + DataProduct dataProduct = dataCatalogService.removeDataProductFromMetadataSchema(dataProductId, schemaName); + + responseObserver + .onNext(DataProductRemoveFromMetadataSchemaResponse.newBuilder().setDataProduct(dataProduct).build()); + responseObserver.onCompleted(); + } + + @Override + public void searchDataProducts(DataProductSearchRequest request, + StreamObserver<DataProductSearchResponse> responseObserver) { + + MetadataSchemaQueryResult searchResult = dataCatalogService.searchDataProducts(request.getSql()); + List<DataProduct> dataProducts = searchResult.dataProducts(); + responseObserver.onNext(DataProductSearchResponse.newBuilder().addAllDataProducts(dataProducts).build()); + responseObserver.onCompleted(); + } + @Override public void getMetadataSchema(MetadataSchemaGetRequest request, StreamObserver<MetadataSchemaGetResponse> responseObserver) { @@ -187,19 +213,6 @@ public class DataCatalogAPIService extends DataCatalogAPIServiceGrpc.DataCatalog responseObserver.onCompleted(); } - @Override - public void removeDataProductFromMetadataSchema(DataProductRemoveFromMetadataSchemaRequest request, - StreamObserver<DataProductRemoveFromMetadataSchemaResponse> responseObserver) { - - String dataProductId = request.getDataProductId(); - String schemaName = request.getSchemaName(); - DataProduct dataProduct = dataCatalogService.removeDataProductFromMetadataSchema(dataProductId, schemaName); - - responseObserver - .onNext(DataProductRemoveFromMetadataSchemaResponse.newBuilder().setDataProduct(dataProduct).build()); - responseObserver.onCompleted(); - } - @Override public void updateMetadataSchemaField(MetadataSchemaFieldUpdateRequest request, StreamObserver<MetadataSchemaFieldUpdateResponse> responseObserver) { diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/DataCatalogService.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/DataCatalogService.java index 2a41ead..093263a 100644 --- a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/DataCatalogService.java +++ b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/DataCatalogService.java @@ -5,6 +5,7 @@ import java.util.List; import org.apache.airavata.datacatalog.api.DataProduct; import org.apache.airavata.datacatalog.api.MetadataSchema; import org.apache.airavata.datacatalog.api.MetadataSchemaField; +import org.apache.airavata.datacatalog.api.query.MetadataSchemaQueryResult; /** * Transactional service layer for CRUD operations on data catalog database. @@ -38,4 +39,6 @@ public interface DataCatalogService { DataProduct removeDataProductFromMetadataSchema(String dataProductId, String schemaName); MetadataSchemaField updateMetadataSchemaField(MetadataSchemaField metadataSchemaField); + + MetadataSchemaQueryResult searchDataProducts(String sql); } diff --git a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/impl/DataCatalogServiceImpl.java b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/impl/DataCatalogServiceImpl.java index 8e7ee79..59d1d94 100644 --- a/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/impl/DataCatalogServiceImpl.java +++ b/data-catalog-api/server/src/main/java/org/apache/airavata/datacatalog/api/service/impl/DataCatalogServiceImpl.java @@ -15,6 +15,8 @@ import org.apache.airavata.datacatalog.api.mapper.MetadataSchemaMapper; import org.apache.airavata.datacatalog.api.model.DataProductEntity; import org.apache.airavata.datacatalog.api.model.MetadataSchemaEntity; import org.apache.airavata.datacatalog.api.model.MetadataSchemaFieldEntity; +import org.apache.airavata.datacatalog.api.query.MetadataSchemaQueryExecutor; +import org.apache.airavata.datacatalog.api.query.MetadataSchemaQueryResult; import org.apache.airavata.datacatalog.api.repository.DataProductRepository; import org.apache.airavata.datacatalog.api.repository.MetadataSchemaFieldRepository; import org.apache.airavata.datacatalog.api.repository.MetadataSchemaRepository; @@ -47,6 +49,9 @@ public class DataCatalogServiceImpl implements DataCatalogService { @Autowired MetadataSchemaFieldMapper metadataSchemaFieldMapper; + @Autowired + MetadataSchemaQueryExecutor metadataSchemaQueryExecutor; + @Override public DataProduct createDataProduct(DataProduct dataProduct) { @@ -97,6 +102,23 @@ public class DataCatalogServiceImpl implements DataCatalogService { return toDataProduct(savedDataProductEntity); } + @Override + public DataProduct removeDataProductFromMetadataSchema(String dataProductId, String schemaName) { + + // TODO: handle data product not found + DataProductEntity dataProduct = dataProductRepository.findByExternalId(dataProductId); + // TODO: handle metadata schema not found + MetadataSchemaEntity metadataSchemaEntity = metadataSchemaRepository.findBySchemaName(schemaName); + dataProduct.removeMetadataSchema(metadataSchemaEntity); + DataProductEntity savedDataProductEntity = dataProductRepository.save(dataProduct); + return toDataProduct(savedDataProductEntity); + } + + @Override + public MetadataSchemaQueryResult searchDataProducts(String sql) { + return metadataSchemaQueryExecutor.execute(sql); + } + @Override public MetadataSchema getMetadataSchema(String schemaName) { MetadataSchemaEntity metadataSchemaEntity = metadataSchemaRepository.findBySchemaName(schemaName); @@ -169,18 +191,6 @@ public class DataCatalogServiceImpl implements DataCatalogService { return fields; } - @Override - public DataProduct removeDataProductFromMetadataSchema(String dataProductId, String schemaName) { - - // TODO: handle data product not found - DataProductEntity dataProduct = dataProductRepository.findByExternalId(dataProductId); - // TODO: handle metadata schema not found - MetadataSchemaEntity metadataSchemaEntity = metadataSchemaRepository.findBySchemaName(schemaName); - dataProduct.removeMetadataSchema(metadataSchemaEntity); - DataProductEntity savedDataProductEntity = dataProductRepository.save(dataProduct); - return toDataProduct(savedDataProductEntity); - } - @Override public MetadataSchemaField updateMetadataSchemaField(MetadataSchemaField metadataSchemaField) { diff --git a/data-catalog-api/server/src/main/resources/logback.xml b/data-catalog-api/server/src/main/resources/logback.xml index 5cd56ea..3f5abdd 100644 --- a/data-catalog-api/server/src/main/resources/logback.xml +++ b/data-catalog-api/server/src/main/resources/logback.xml @@ -47,6 +47,10 @@ <logger name="org.apache.airavata" level="INFO"/> <logger name="org.hibernate" level="INFO"/> <logger name="net.schmizz.sshj" level="WARN"/> + <!-- <logger name="org.apache.airavata" level="DEBUG"/> --> + <!-- <logger name="org.springframework.transaction.interceptor" level="TRACE"/> --> + <!-- <logger name="org.springframework.transaction" level="TRACE"/> --> + <!-- <logger name="org.springframework.orm.jpa" level="TRACE"/> --> <root level="INFO"> <appender-ref ref="CONSOLE"/> <appender-ref ref="LOGFILE"/> diff --git a/data-catalog-api/stubs/src/main/proto/DataCatalogAPI.proto b/data-catalog-api/stubs/src/main/proto/DataCatalogAPI.proto index 2845271..a7a09c3 100644 --- a/data-catalog-api/stubs/src/main/proto/DataCatalogAPI.proto +++ b/data-catalog-api/stubs/src/main/proto/DataCatalogAPI.proto @@ -105,6 +105,30 @@ message DataProductDeleteRequest { } message DataProductDeleteResponse { } +message DataProductAddToMetadataSchemaRequest { + UserInfo user_info = 1; + string data_product_id = 2; + string schema_name = 3; +} +message DataProductAddToMetadataSchemaResponse { + DataProduct data_product = 1; +} +message DataProductRemoveFromMetadataSchemaRequest { + UserInfo user_info = 1; + string data_product_id = 2; + string schema_name = 3; +} +message DataProductRemoveFromMetadataSchemaResponse { + DataProduct data_product = 1; +} +message DataProductSearchRequest { + UserInfo user_info = 1; + string sql = 2; +} +message DataProductSearchResponse { + repeated DataProduct data_products = 1; +} + message MetadataSchemaGetRequest { UserInfo user_info = 1; @@ -161,23 +185,6 @@ message MetadataSchemaFieldListRequest { message MetadataSchemaFieldListResponse { repeated MetadataSchemaField metadata_schema_fields = 1; } -message DataProductAddToMetadataSchemaRequest { - UserInfo user_info = 1; - string data_product_id = 2; - string schema_name = 3; -} -message DataProductAddToMetadataSchemaResponse { - DataProduct data_product = 1; -} -message DataProductRemoveFromMetadataSchemaRequest { - UserInfo user_info = 1; - string data_product_id = 2; - string schema_name = 3; -} -message DataProductRemoveFromMetadataSchemaResponse { - DataProduct data_product = 1; -} - service DataCatalogAPIService { rpc createDataProduct(DataProductCreateRequest) returns (DataProductCreateResponse){} @@ -194,4 +201,5 @@ service DataCatalogAPIService { rpc getMetadataSchemaFields(MetadataSchemaFieldListRequest) returns (MetadataSchemaFieldListResponse){} rpc addDataProductToMetadataSchema(DataProductAddToMetadataSchemaRequest) returns (DataProductAddToMetadataSchemaResponse){} rpc removeDataProductFromMetadataSchema(DataProductRemoveFromMetadataSchemaRequest) returns (DataProductRemoveFromMetadataSchemaResponse){} + rpc searchDataProducts(DataProductSearchRequest) returns (DataProductSearchResponse){} }
