suneet-s commented on a change in pull request #9449: URL: https://github.com/apache/druid/pull/9449#discussion_r421834780
########## File path: server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; Review comment: Why did you choose this package for the sql ingestion classes? Other implementations of InputSource lives under druid-core not druid-server. And while we're on the subject of packages - have you thought about making this an extension? ########## File path: server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.List; + +public class InputSourceModule implements DruidModule Review comment: javadocs please - same on all the new classes. Also this module name sounds very broad - have you thought about scoping this to a `SqlInputModule` ? Once we know the right package structure, I think the name of the module will make more sense. ########## File path: server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.List; + +public class InputSourceModule implements DruidModule +{ + @Override + public List<? extends Module> getJacksonModules() Review comment: Can you write a ModuleTest that verifies all the injections are made correctly? ########## File path: server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.ResultSetException; +import org.skife.jdbi.v2.exceptions.StatementException; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class SqlEntity implements InputEntity +{ + private static final Logger LOG = new Logger(SqlEntity.class); + + private final String sql; + private final ObjectMapper objectMapper; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final boolean foldCase; + + public SqlEntity( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + boolean foldCase, + ObjectMapper objectMapper + ) + { + this.sql = sql; + this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.foldCase = foldCase; + this.objectMapper = objectMapper; + } + + public String getSql() + { + return sql; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Please use fetch() instead"); Review comment: Why can't open() do this for you? Callers of the interface won't know about the inner workings of each implementation, so if we have a remediation, we should do this automatically. Inject a Supplier<File> in to this class so you can get a temp dir, this will also make it easier to test. ########## File path: server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.ResultSetException; +import org.skife.jdbi.v2.exceptions.StatementException; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class SqlEntity implements InputEntity +{ + private static final Logger LOG = new Logger(SqlEntity.class); + + private final String sql; + private final ObjectMapper objectMapper; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final boolean foldCase; + + public SqlEntity( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + boolean foldCase, + ObjectMapper objectMapper + ) + { + this.sql = sql; + this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.foldCase = foldCase; + this.objectMapper = objectMapper; + } + + public String getSql() + { + return sql; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Please use fetch() instead"); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException + { + final File tempFile = File.createTempFile("druid-sql-entity", ".tmp", temporaryDirectory); + return openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, tempFile); + + } + + public static CleanableFile openCleanableFile( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + ObjectMapper objectMapper, + boolean foldCase, + File tempFile + ) + throws IOException + { + Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!"); Review comment: Is this possible? The annotations indicate that this should always be `NonNull` If this can be null, then I think it should be checked at the constructor - right now, there is the possiblilty that we create temp files that are never cleaned up if an exception is thrown on this line. ########## File path: server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.ResultSetException; +import org.skife.jdbi.v2.exceptions.StatementException; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class SqlEntity implements InputEntity +{ + private static final Logger LOG = new Logger(SqlEntity.class); + + private final String sql; + private final ObjectMapper objectMapper; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final boolean foldCase; + + public SqlEntity( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + boolean foldCase, + ObjectMapper objectMapper + ) + { + this.sql = sql; + this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.foldCase = foldCase; + this.objectMapper = objectMapper; + } + + public String getSql() + { + return sql; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Please use fetch() instead"); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException + { + final File tempFile = File.createTempFile("druid-sql-entity", ".tmp", temporaryDirectory); + return openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, tempFile); + + } + + public static CleanableFile openCleanableFile( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + ObjectMapper objectMapper, + boolean foldCase, + File tempFile + ) + throws IOException + { + Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!"); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos); + sqlFirehoseDatabaseConnector.retryWithHandle( + (handle) -> { + ResultIterator<Map<String, Object>> resultIterator = handle.createQuery( + sql + ).map( + (index, r, ctx) -> { + Map<String, Object> resultRow = foldCase ? new CaseFoldedMap() : new HashMap<>(); + ResultSetMetaData resultMetadata; + try { + resultMetadata = r.getMetaData(); + } + catch (SQLException e) { + throw new ResultSetException("Unable to obtain metadata from result set", e, ctx); + } + try { + for (int i = 1; i <= resultMetadata.getColumnCount(); i++) { + String key = resultMetadata.getColumnName(i); + String alias = resultMetadata.getColumnLabel(i); + Object value = r.getObject(i); + resultRow.put(alias != null ? alias : key, value); + } + } + catch (SQLException e) { + throw new ResultSetException("Unable to access specific metadata from " + + "result set metadata", e, ctx); + } + return resultRow; + } + ).iterator(); + jg.writeStartArray(); + while (resultIterator.hasNext()) { + jg.writeObject(resultIterator.next()); + } + jg.writeEndArray(); + jg.close(); Review comment: Are there any size restrictions here? What happens if I try to ingest a very large sql output? How big do the indexing machines need to be? How long can the db connection be kept open so that we can keep writing records to the temp file? Are there any logs that indicate what I need to do operationally to support this? ########## File path: server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.ResultSetException; +import org.skife.jdbi.v2.exceptions.StatementException; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class SqlEntity implements InputEntity +{ + private static final Logger LOG = new Logger(SqlEntity.class); + + private final String sql; + private final ObjectMapper objectMapper; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final boolean foldCase; + + public SqlEntity( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + boolean foldCase, + ObjectMapper objectMapper + ) + { + this.sql = sql; + this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.foldCase = foldCase; + this.objectMapper = objectMapper; + } + + public String getSql() + { + return sql; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Please use fetch() instead"); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException + { + final File tempFile = File.createTempFile("druid-sql-entity", ".tmp", temporaryDirectory); + return openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, tempFile); + + } + + public static CleanableFile openCleanableFile( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + ObjectMapper objectMapper, + boolean foldCase, + File tempFile + ) + throws IOException + { + Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!"); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos); + sqlFirehoseDatabaseConnector.retryWithHandle( + (handle) -> { + ResultIterator<Map<String, Object>> resultIterator = handle.createQuery( + sql + ).map( + (index, r, ctx) -> { + Map<String, Object> resultRow = foldCase ? new CaseFoldedMap() : new HashMap<>(); + ResultSetMetaData resultMetadata; + try { + resultMetadata = r.getMetaData(); + } + catch (SQLException e) { + throw new ResultSetException("Unable to obtain metadata from result set", e, ctx); + } + try { + for (int i = 1; i <= resultMetadata.getColumnCount(); i++) { + String key = resultMetadata.getColumnName(i); + String alias = resultMetadata.getColumnLabel(i); + Object value = r.getObject(i); + resultRow.put(alias != null ? alias : key, value); + } + } + catch (SQLException e) { + throw new ResultSetException("Unable to access specific metadata from " + + "result set metadata", e, ctx); + } + return resultRow; Review comment: Can you add more comments in here for what this is trying to do ########## File path: server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +public class SqlInputSource extends AbstractInputSource implements SplittableInputSource<String> +{ + private final List<String> sqls; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final ObjectMapper objectMapper; + private final boolean foldCase; + + @JsonCreator + public SqlInputSource( + @JsonProperty("sqls") List<String> sqls, + @JsonProperty("foldCase") boolean foldCase, + @JsonProperty("database") SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + @JacksonInject @Smile ObjectMapper objectMapper + ) + { + Preconditions.checkArgument(sqls.size() > 0, "No SQL queries provided"); Review comment: Do we need null checks on all the json provided fields? Or is that handled by some annotation I'm not seeing? What is the default boolean value if `foldCase` is not specified? ########## File path: server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +public class SqlInputSource extends AbstractInputSource implements SplittableInputSource<String> +{ + private final List<String> sqls; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final ObjectMapper objectMapper; + private final boolean foldCase; + + @JsonCreator + public SqlInputSource( + @JsonProperty("sqls") List<String> sqls, + @JsonProperty("foldCase") boolean foldCase, + @JsonProperty("database") SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + @JacksonInject @Smile ObjectMapper objectMapper + ) + { + Preconditions.checkArgument(sqls.size() > 0, "No SQL queries provided"); + + this.sqls = sqls; + this.foldCase = foldCase; + this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.objectMapper = objectMapper; + } + + @JsonProperty + public List<String> getSqls() + { + return sqls; + } + + @JsonProperty + public boolean isFoldCase() + { + return foldCase; + } + + @JsonProperty("database") + public SQLFirehoseDatabaseConnector getSQLFirehoseDatabaseConnector() + { + return sqlFirehoseDatabaseConnector; + } + + @Override + public Stream<InputSplit<String>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return sqls.stream().map(InputSplit::new); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return sqls.size(); + } + + @Override + public SplittableInputSource<String> withSplit(InputSplit<String> split) + { + return new SqlInputSource( + Collections.singletonList(split.get()), + foldCase, + sqlFirehoseDatabaseConnector, + objectMapper + ); + } + + @Override + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + { + final SqlInputFormat inputFormat = new SqlInputFormat(objectMapper); + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + createSplits(inputFormat, null) + .map(split -> new SqlEntity(split.get(), sqlFirehoseDatabaseConnector, foldCase, objectMapper)).iterator(), + temporaryDirectory + ); + } + + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqlInputSource that = (SqlInputSource) o; + return foldCase == that.foldCase && + sqls.equals(that.sqls) && + sqlFirehoseDatabaseConnector.equals(that.sqlFirehoseDatabaseConnector) && + objectMapper.equals(that.objectMapper); + } + + @Override + public int hashCode() + { + return Objects.hash(sqls, sqlFirehoseDatabaseConnector, objectMapper, foldCase); + } Review comment: EqualsVerifier test please for equals and hashcode ########## File path: server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata.input; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.ResultSetException; +import org.skife.jdbi.v2.exceptions.StatementException; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class SqlEntity implements InputEntity +{ + private static final Logger LOG = new Logger(SqlEntity.class); + + private final String sql; + private final ObjectMapper objectMapper; + private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; + private final boolean foldCase; + + public SqlEntity( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + boolean foldCase, + ObjectMapper objectMapper + ) + { + this.sql = sql; + this.sqlFirehoseDatabaseConnector = sqlFirehoseDatabaseConnector; + this.foldCase = foldCase; + this.objectMapper = objectMapper; + } + + public String getSql() + { + return sql; + } + + @Nullable + @Override + public URI getUri() + { + return null; + } + + @Override + public InputStream open() + { + throw new UnsupportedOperationException("Please use fetch() instead"); + } + + @Override + public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException + { + final File tempFile = File.createTempFile("druid-sql-entity", ".tmp", temporaryDirectory); + return openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, tempFile); + + } + + public static CleanableFile openCleanableFile( + String sql, + SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector, + ObjectMapper objectMapper, + boolean foldCase, + File tempFile + ) + throws IOException + { + Preconditions.checkNotNull(sqlFirehoseDatabaseConnector, "SQL Metadata Connector not configured!"); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + final JsonGenerator jg = objectMapper.getFactory().createGenerator(fos); + sqlFirehoseDatabaseConnector.retryWithHandle( + (handle) -> { + ResultIterator<Map<String, Object>> resultIterator = handle.createQuery( + sql + ).map( + (index, r, ctx) -> { + Map<String, Object> resultRow = foldCase ? new CaseFoldedMap() : new HashMap<>(); + ResultSetMetaData resultMetadata; + try { + resultMetadata = r.getMetaData(); + } + catch (SQLException e) { + throw new ResultSetException("Unable to obtain metadata from result set", e, ctx); + } + try { + for (int i = 1; i <= resultMetadata.getColumnCount(); i++) { + String key = resultMetadata.getColumnName(i); + String alias = resultMetadata.getColumnLabel(i); + Object value = r.getObject(i); + resultRow.put(alias != null ? alias : key, value); + } + } + catch (SQLException e) { + throw new ResultSetException("Unable to access specific metadata from " + + "result set metadata", e, ctx); + } + return resultRow; + } + ).iterator(); + jg.writeStartArray(); + while (resultIterator.hasNext()) { + jg.writeObject(resultIterator.next()); + } + jg.writeEndArray(); + jg.close(); + return null; + }, + (exception) -> { + final boolean isStatementException = exception instanceof StatementException || + (exception instanceof CallbackFailedException + && exception.getCause() instanceof StatementException); + return sqlFirehoseDatabaseConnector.isTransientException(exception) && !(isStatementException); + } + ); + } + return new CleanableFile() + { + @Override + public File file() + { + return tempFile; + } + + @Override + public void close() + { + if (!tempFile.delete()) { + LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath()); + } + } + }; + } Review comment: Can you add unit tests for this class please - same with SqlInputFormat ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
