turcsanyip commented on code in PR #6584: URL: https://github.com/apache/nifi/pull/6584#discussion_r1024914432
########## nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services-api/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeProperties.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake.util; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +public final class SnowflakeProperties { + private SnowflakeProperties() { + } + + public static final PropertyDescriptor ACCOUNT_LOCATOR = new PropertyDescriptor.Builder() + .name("account-locator") + .displayName("Account Locator") + .description("Snowflake account locator to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor CLOUD_REGION = new PropertyDescriptor.Builder() + .name("cloud-region") + .displayName("Cloud Region") + .description("Snowflake cloud region to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor CLOUD_TYPE = new PropertyDescriptor.Builder() + .name("cloud-type") + .displayName("Cloud Type") + .description("Snowflake cloud type to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor ORGANIZATION_NAME = new PropertyDescriptor.Builder() + .name("organization-name") + .displayName("Organization Name") + .description("Snowflake organization name to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() + .name("account-name") + .displayName("Account Name") + .description("Snowflake account name to use for connection.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder() + .name("database") + .displayName("Database") + .description("The database to use by default. The same as passing 'db=DATABASE_NAME' to the connection string.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .name("schema") + .displayName("Schema") + .description("The schema to use by default. The same as passing 'schema=SCHEMA' to the connection string.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("PUBLIC") Review Comment: I think we should omit the default value. If a property has a default value, it cannot be emptied and the property always has a value. Having a default value for the `Schema` property, it would not be possible to leave the property empty in `PutSnowflakeInternalStage` and fall back to the schema configured in `SnowflakeComputingConnectionPool` or to leave the property empty in `SnowflakeComputingConnectionPool` and fall back the user's default schema configured in Snowflake. ########## nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/util/SnowflakeInternalStageType.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake.util; + +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.nifi.components.DescribedValue; + +public enum SnowflakeInternalStageType implements DescribedValue { + USER("user", "User", "Use the user's internal stage") { + @Override + public String getStageName(final SnowflakeInternalStageTypeParameters parameters) { + return "@~"; + } + }, + TABLE("table", "Table", "Use a table's internal stage") { + @Override + public String getStageName(final SnowflakeInternalStageTypeParameters parameters) { + final StringBuilder stringBuilder = new StringBuilder("@"); + Optional.ofNullable(parameters.getDatabase()) + .ifPresent(database -> stringBuilder.append(database).append(".")); + Optional.ofNullable(parameters.getSchema()) + .ifPresent(schema -> stringBuilder.append(schema).append(".")); + + stringBuilder.append("%").append(Objects.requireNonNull(parameters.getTable())); + return stringBuilder.toString(); + } + }, + NAMED("named", "Named", "Use a named internal stage. This stage must be created beforehand in Snowflake") { + @Override + public String getStageName(final SnowflakeInternalStageTypeParameters parameters) { + final StringBuilder stringBuilder = new StringBuilder("@"); + Optional.ofNullable(parameters.getDatabase()) + .ifPresent(database -> stringBuilder.append(database).append(".")); + Optional.ofNullable(parameters.getSchema()) + .ifPresent(schema -> stringBuilder.append(schema).append(".")); + stringBuilder.append(Objects.requireNonNull(parameters.getStageName())); + return stringBuilder.toString(); Review Comment: It is a bit confusing that `getStageName()` is used in both `SnowflakeInternalStageTypeParameters` and `SnowflakeInternalStageType`. It could be called `getStageIdentifier()` in `SnowflakeInternalStageType`. ########## nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageType; +import org.apache.nifi.processors.snowflake.util.SnowflakeInternalStageTypeParameters; +import org.apache.nifi.processors.snowflake.util.SnowflakeProperties; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, + description = "Staged file path") +}) +@Tags({"snowflake", "jdbc", "database", "connection", "snowpipe"}) +@CapabilityDescription("Puts files into a Snowflake internal stage. The internal stage must be created in the Snowflake account beforehand." + + " This processor can be connected to a StartSnowflakeIngest processor to ingest the file in the internal stage") +@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class}) +public class PutSnowflakeInternalStage extends AbstractProcessor { + + public static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new PropertyDescriptor.Builder() + .name("snowflake-connection-provider") + .displayName("Snowflake Connection Provider") + .description("Specifies the Controller Service to use for creating SQL connections to Snowflake.") + .identifiesControllerService(SnowflakeConnectionProviderService.class) + .required(true) + .build(); + + public static final PropertyDescriptor INTERNAL_STAGE_TYPE = new PropertyDescriptor.Builder() + .name("internal-stage-type") + .displayName("Internal Stage Type") + .description("The type of internal stage to use") + .allowableValues(SnowflakeInternalStageType.class) + .required(true) + .build(); + + public static final PropertyDescriptor DATABASE = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.DATABASE) + .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED, SnowflakeInternalStageType.TABLE) + .build(); + + public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SnowflakeProperties.SCHEMA) + .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.NAMED, SnowflakeInternalStageType.TABLE) + .build(); + + public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() + .name("table") + .displayName("Table") + .description("The name of the table in the Snowflake account.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .dependsOn(INTERNAL_STAGE_TYPE, SnowflakeInternalStageType.TABLE) + .build(); + + public static final PropertyDescriptor INTERNAL_STAGE_NAME = new PropertyDescriptor.Builder() + .name("internal-stage-name") + .displayName("Internal Stage Name") Review Comment: Similar to `Table` or `Pipe`, I would call it simply `Stage` on the UI. ########## nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.snowflake; + +import static org.apache.nifi.processors.snowflake.util.SnowflakeAttributes.ATTRIBUTE_STAGED_FILE_PATH; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import net.snowflake.ingest.SimpleIngestManager; +import net.snowflake.ingest.connection.HistoryResponse; +import net.snowflake.ingest.connection.HistoryResponse.FileEntry; +import net.snowflake.ingest.connection.IngestResponseException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Stateful(scopes = Scope.CLUSTER, + description = "The 'begin mark' from the response of a history request is stored to keep track of already requested history time range.") +@DefaultSettings(penaltyDuration = "5 sec") +@ReadsAttributes({ + @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description = "Staged file path") +}) +@Tags({"snowflake", "snowpipe", "ingest", "history"}) +@CapabilityDescription("Waits until a file in a Snowflake stage is ingested. The stage must be created in the Snowflake account beforehand." + + " This processor is usually connected to an upstream StartSnowflakeIngest processor to make sure that the file is ingested.") +@SeeAlso({StartSnowflakeIngest.class, PutSnowflakeInternalStage.class}) +public class GetSnowflakeIngestStatus extends AbstractProcessor { + + public static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new PropertyDescriptor.Builder() + .name("ingest-manager-provider") + .displayName("Ingest Manager Provider") + .description("Specifies the Controller Service to use for ingesting Snowflake staged files.") + .identifiesControllerService(SnowflakeIngestManagerProviderService.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("For FlowFiles of successful ingestion") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("For FlowFiles of failed ingestion") + .build(); + + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("For FlowFiles whose file is still not ingested. These FlowFiles should be routed back to this processor to try again later") + .build(); + + static final List<PropertyDescriptor> PROPERTIES = Collections.singletonList( + INGEST_MANAGER_PROVIDER + ); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_RETRY, + REL_FAILURE + ))); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String stagedFilePath = flowFile.getAttribute(ATTRIBUTE_STAGED_FILE_PATH); + if (stagedFilePath == null) { + getLogger().error("Missing required attribute [\"" + ATTRIBUTE_STAGED_FILE_PATH + "\"] for FlowFile"); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final SnowflakeIngestManagerProviderService ingestManagerProviderService = + context.getProperty(INGEST_MANAGER_PROVIDER) + .asControllerService(SnowflakeIngestManagerProviderService.class); + final String beginMarkKey = stagedFilePath + ".begin.mark"; + final StateManager stateManager = StateManager.create(beginMarkKey, session); + final String beginMark = stateManager.getBeginMark(); + final HistoryResponse historyResponse; + try { + final SimpleIngestManager snowflakeIngestManager = ingestManagerProviderService.getIngestManager(); + historyResponse = snowflakeIngestManager.getHistory(null, null, beginMark); + } catch (URISyntaxException | IOException e) { + throw new ProcessException("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e); + } catch (IngestResponseException e) { + getLogger().error("Failed to get Snowflake ingest history for staged file [" + stagedFilePath + "]", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final Optional<FileEntry> fileEntry = Optional.ofNullable(historyResponse.files) + .flatMap(files -> files.stream() + .filter(entry -> entry.getPath().equals(stagedFilePath) && entry.isComplete()) + .findFirst()); + + if (!fileEntry.isPresent()) { + stateManager.saveBeginMarkToState(historyResponse.getNextBeginMark()); + session.transfer(session.penalize(flowFile), REL_RETRY); + return; + } + + stateManager.removeBeginMarkFromState(); + if (fileEntry.get().getErrorsSeen() > 0) { + getLogger().error("Failed to ingest file [" + stagedFilePath + "] in Snowflake stage via pipe [" + ingestManagerProviderService.getPipeName() + "]." + + " Error: " + fileEntry.get().getFirstError()); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + session.transfer(flowFile, REL_SUCCESS); + + } + + private static class StateManager { + + private final StateMap stateMap; + private final String beginMarkKey; + private final ProcessSession session; + + public StateManager(final StateMap stateMap, final String beginMarkKey, final ProcessSession session) { + this.stateMap = stateMap; + this.beginMarkKey = beginMarkKey; + this.session = session; + } + + public static StateManager create(final String beginMarkKey, final ProcessSession session) { + try { + return new StateManager(session.getState(Scope.CLUSTER), beginMarkKey, session); + } catch (IOException e) { + throw new ProcessException("Failed to get state", e); + } + } + + public String getBeginMark() { + return stateMap.get(beginMarkKey); + } + + public void saveBeginMarkToState(final String newBeginMark) { + final Map<String, String> newState = new HashMap<>(stateMap.toMap()); + newState.put(beginMarkKey, newBeginMark); + try { + session.setState(newState, Scope.CLUSTER); Review Comment: I'm afraid the processor cannot use the state in this way because the parallel executions would overwrite each other's state (multiple processor instances in a cluster or multiple threads on a single node). To avoid it, I think `LOCAL` scope should be used and maybe also single thread execution. As far as I understand, it is an optimization to store the history mark instead of querying the full history all the time. So some state overriding may not be critical. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
