turcsanyip commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1020962925
##########
nifi-api/src/main/java/org/apache/nifi/components/AllowableValue.java:
##########
@@ -68,6 +68,10 @@ public AllowableValue(final String value, final String
displayName, final String
this.description = description;
}
+ public static AllowableValue ofDescribedValue(final DescribedValue
describedValue) {
Review Comment:
Please provide javadoc as it is a public method in `nifi-api`.
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Puts files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
+ + " This processor can be connected to an StartSnowflakeIngest
processor to ingest the file in the internal stage")
Review Comment:
```suggestion
+ " This processor can be connected to a StartSnowflakeIngest
processor to ingest the file in the internal stage.")
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/common/Attributes.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake.common;
+
+public final class Attributes {
Review Comment:
`Attributes` is quite a generic name in the context of NiFi processors. We
typically use a prefix like `SnowflakeAttributes`.
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/CommonProperties.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class CommonProperties {
Review Comment:
We typically prefix the property holder class this way:
`SnowflakeProperties`.
##########
nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java:
##########
@@ -562,6 +562,11 @@ public Builder dependsOn(final PropertyDescriptor
property, final AllowableValue
return this;
}
+ public Builder dependsOn(final PropertyDescriptor property, final
DescribedValue... describedValues) {
Review Comment:
Please provide javadoc as it is a public method in `nifi-api`.
##########
nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java:
##########
@@ -562,6 +562,11 @@ public Builder dependsOn(final PropertyDescriptor
property, final AllowableValue
return this;
}
+ public Builder dependsOn(final PropertyDescriptor property, final
DescribedValue... describedValues) {
Review Comment:
`dependentValues` parameter name would be better (similar to
`dependsOn(final PropertyDescriptor property, final AllowableValue...
dependentValues)`)
```suggestion
public Builder dependsOn(final PropertyDescriptor property, final
DescribedValue... dependentValues) {
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import
org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
+
+public enum AccountIdentifierFormat implements DescribedValue {
+ FULL_URL("full-url", "Full URL", "Provide an account identifier in a
single property") {
+ @Override
+ public String getAccount(ConfigurationContext context) {
+ final String[] hostParts = buildHost(context).split("\\.");
+ if (hostParts.length == 0) {
+ throw new IllegalArgumentException("Invalid Snowflake host
url");
+ }
+ return hostParts[0];
+ }
+
+ @Override
+ public String buildHost(final ConfigurationContext context) {
+ return
context.getProperty(StandardSnowflakeIngestManagerProviderService.HOST_URL)
+ .evaluateAttributeExpressions()
+ .getValue();
+ }
+ },
+ ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account
Name") {
+ @Override
+ public String getAccount(ConfigurationContext context) {
+ final String organizationName =
context.getProperty(StandardSnowflakeIngestManagerProviderService.ORGANIZATION_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final String accountName =
context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+ return organizationName + "-" + accountName;
+ }
+
+ @Override
+ public String buildHost(final ConfigurationContext context) {
+ return getAccount(context) + ".snowflakecomputing.com";
Review Comment:
A constant could be used here too.
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.utils.StagedFileWrapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description =
"Staged file path")
+})
+@Tags({"snowflake", "snowpipe", "ingest"})
+@CapabilityDescription("Ingest files in a Snowflake stage. The stage must be
created in the Snowflake account beforehand."
+ + " The result of the ingestion is not available immediately, so this
processor can be connected to an"
+ + " GetSnowflakeIngestStatus processor to wait for the results")
Review Comment:
```suggestion
@CapabilityDescription("Ingests files from a Snowflake internal or external
stage into a Snowflake table. The stage must be created in the Snowflake
account beforehand."
+ " The result of the ingestion is not available immediately, so
this processor can be connected to an"
+ " GetSnowflakeIngestStatus processor to wait for the results.")
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe
processors")
+public class StandardSnowflakeIngestManagerProviderService extends
AbstractControllerService
+ implements SnowflakeIngestManagerProviderService {
+
+ public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new
PropertyDescriptor.Builder()
+ .name("account-identifier-format")
+ .displayName("Account Identifier Format")
+ .description("The format of the account identifier.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(true)
+ .allowableValues(AccountIdentifierFormat.class)
+ .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+ .build();
+
+ public static final PropertyDescriptor HOST_URL = new
PropertyDescriptor.Builder()
+ .name("host-url")
+ .displayName("Snowflake URL")
+ .description("Example host url:
[account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.FULL_URL)
+ .build();
+
+ public static final PropertyDescriptor ACCOUNT_LOCATOR = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor CLOUD_REGION = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor CLOUD_TYPE = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor ORGANIZATION_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_NAME)
+ .build();
+
+ public static final PropertyDescriptor ACCOUNT_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_NAME)
+ .build();
+
+ public static final PropertyDescriptor USER_NAME = new
PropertyDescriptor.Builder()
+ .name("user-name")
+ .displayName("User Name")
+ .description("The Snowflake user name.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor PIPE_NAME = new
PropertyDescriptor.Builder()
+ .name("pipe-name")
+ .displayName("Pipe Name")
+ .description("The Snowflake pipe's name to ingest from.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor PRIVATE_KEY_SERVICE = new
PropertyDescriptor.Builder()
+ .name("private-key-service")
+ .displayName("Private Key Service")
+ .description("Specifies the Controller Service that will provide
the private key. The public key needs to be added to the user account in the
Snowflake account beforehand.")
+ .identifiesControllerService(PrivateKeyService.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor HOST_SCHEME = new
PropertyDescriptor.Builder()
+ .name("host-scheme")
+ .displayName("Host Scheme")
+ .description("The scheme of the host url to connect to.")
+ .allowableValues("http", "https")
+ .defaultValue("https")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor HOST_PORT = new
PropertyDescriptor.Builder()
+ .name("host-port")
+ .displayName("Host Port")
+ .description("The port of the host url to connect to.")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("443")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
Review Comment:
I don't think we will need these properties. We can add them for special use
cases later if needed.
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/AccountIdentifierFormat.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import
org.apache.nifi.snowflake.service.StandardSnowflakeIngestManagerProviderService;
+
+public enum AccountIdentifierFormat implements DescribedValue {
+ FULL_URL("full-url", "Full URL", "Provide an account identifier in a
single property") {
+ @Override
+ public String getAccount(ConfigurationContext context) {
+ final String[] hostParts = buildHost(context).split("\\.");
+ if (hostParts.length == 0) {
+ throw new IllegalArgumentException("Invalid Snowflake host
url");
+ }
+ return hostParts[0];
+ }
+
+ @Override
+ public String buildHost(final ConfigurationContext context) {
+ return
context.getProperty(StandardSnowflakeIngestManagerProviderService.HOST_URL)
+ .evaluateAttributeExpressions()
+ .getValue();
+ }
+ },
+ ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account
Name") {
+ @Override
+ public String getAccount(ConfigurationContext context) {
+ final String organizationName =
context.getProperty(StandardSnowflakeIngestManagerProviderService.ORGANIZATION_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final String accountName =
context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+ return organizationName + "-" + accountName;
+ }
+
+ @Override
+ public String buildHost(final ConfigurationContext context) {
+ return getAccount(context) + ".snowflakecomputing.com";
+ }
+ },
+ ACCOUNT_LOCATOR("account-locator", "Account Locator", "Provide a Snowflake
Account Locator") {
+ @Override
+ public String getAccount(ConfigurationContext context) {
+ return
context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_LOCATOR)
+ .evaluateAttributeExpressions()
+ .getValue();
+ }
+
+ @Override
+ public String buildHost(final ConfigurationContext context) {
+ final String accountLocator =
context.getProperty(StandardSnowflakeIngestManagerProviderService.ACCOUNT_LOCATOR)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final String cloudRegion =
context.getProperty(StandardSnowflakeIngestManagerProviderService.CLOUD_REGION)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final String cloudType =
context.getProperty(StandardSnowflakeIngestManagerProviderService.CLOUD_TYPE)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final StringBuilder hostBuilder = new StringBuilder();
+ hostBuilder.append(accountLocator)
+ .append(".").append(cloudRegion);
+ if (cloudType != null) {
+ hostBuilder.append(".").append(cloudType);
+ }
+ hostBuilder.append(".snowflakecomputing.com");
+ return hostBuilder.toString();
+ }
+ };
+
+ private final String value;
+ private final String displayName;
+ private final String description;
+
+ AccountIdentifierFormat(final String value, final String displayName,
final String description) {
+ this.value = value;
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ public abstract String getAccount(final ConfigurationContext context);
+ public abstract String buildHost(final ConfigurationContext context);
Review Comment:
I would rename it to `getHostname()`. It is account identifier type specific
if the hostname can be retrieved from a single property or needs to be built
from multiple ones. So it is an implementation detail and the method name
should not reflect it.
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_LOCATOR;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_NAME;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_REGION;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_TYPE;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_URL;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+
+public enum ConnectionUrlFormat implements DescribedValue {
+ FULL_URL("full-url", "Full URL", "Provide connection URL in a single
property") {
+ @Override
+ public String buildConnectionUrl(final ConfigurationContext context) {
+ String snowflakeUrl =
context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue();
+ if (!snowflakeUrl.startsWith(SNOWFLAKE_SCHEME)) {
+ snowflakeUrl = SNOWFLAKE_URI_PREFIX + snowflakeUrl;
+ }
+
+ return snowflakeUrl;
+ }
+ },
+ ACCOUNT_NAME("account-name", "Account Name", "Provide a Snowflake Account
Name") {
+ @Override
+ public String buildConnectionUrl(ConfigurationContext context) {
+ final String organizationName =
context.getProperty(SNOWFLAKE_ORGANIZATION_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final String accountName =
context.getProperty(SNOWFLAKE_ACCOUNT_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+
+ return SNOWFLAKE_URI_PREFIX + organizationName + "-" + accountName
+ ".snowflakecomputing.com";
Review Comment:
It may be worth introducing `SNOWFLAKE_URI_SUFFIX` constant for
`".snowflakecomputing.com"` (also used at line 74).
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
Review Comment:
`snowpipe` tag could be applied here too.
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+ description = "The 'begin mark' from the response of a history request
is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description =
"Staged file path")
+})
+@Tags({"snowflake", "snowpipe", "ingest", "history"})
+@CapabilityDescription("Waits until a file in a Snowflake stage is ingested.
The stage must be created in the Snowflake account beforehand."
+ + " This processor is usually connected to an upstream
StartSnowflakeIngest processor to make sure that the file is ingested")
Review Comment:
```suggestion
@CapabilityDescription("Waits until a file in a Snowflake stage is ingested.
The stage must be created in the Snowflake account beforehand."
+ " This processor is usually connected to an upstream
StartSnowflakeIngest processor to make sure that the file is ingested.")
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe
processors")
+public class StandardSnowflakeIngestManagerProviderService extends
AbstractControllerService
+ implements SnowflakeIngestManagerProviderService {
+
+ public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new
PropertyDescriptor.Builder()
+ .name("account-identifier-format")
+ .displayName("Account Identifier Format")
+ .description("The format of the account identifier.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(true)
+ .allowableValues(AccountIdentifierFormat.class)
+ .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+ .build();
+
+ public static final PropertyDescriptor HOST_URL = new
PropertyDescriptor.Builder()
+ .name("host-url")
+ .displayName("Snowflake URL")
+ .description("Example host url:
[account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.FULL_URL)
+ .build();
+
+ public static final PropertyDescriptor ACCOUNT_LOCATOR = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor CLOUD_REGION = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor CLOUD_TYPE = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor ORGANIZATION_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_NAME)
+ .build();
+
+ public static final PropertyDescriptor ACCOUNT_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_NAME)
+ .build();
+
+ public static final PropertyDescriptor USER_NAME = new
PropertyDescriptor.Builder()
+ .name("user-name")
+ .displayName("User Name")
+ .description("The Snowflake user name.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor PIPE_NAME = new
PropertyDescriptor.Builder()
+ .name("pipe-name")
+ .displayName("Pipe Name")
+ .description("The Snowflake pipe's name to ingest from.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
Review Comment:
I would consider splitting the property to `Database`, `Schema` and `Pipe`.
Otherwise, it should be mentioned in the description that the fully
qualified name is expected containing the database and the schema. However, the
3 properties would be more straightforward in my opinion.
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/StandardSnowflakeIngestManagerProviderService.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.key.service.api.PrivateKeyService;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.snowflake.SnowflakeIngestManagerProviderService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.snowflake.service.util.AccountIdentifierFormat;
+import org.apache.nifi.snowflake.service.util.CommonProperties;
+
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Provides a Snowflake Ingest Manager for Snowflake pipe
processors")
+public class StandardSnowflakeIngestManagerProviderService extends
AbstractControllerService
+ implements SnowflakeIngestManagerProviderService {
+
+ public static final PropertyDescriptor ACCOUNT_IDENTIFIER_FORMAT = new
PropertyDescriptor.Builder()
+ .name("account-identifier-format")
+ .displayName("Account Identifier Format")
+ .description("The format of the account identifier.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(true)
+ .allowableValues(AccountIdentifierFormat.class)
+ .defaultValue(AccountIdentifierFormat.ACCOUNT_NAME.getValue())
+ .build();
+
+ public static final PropertyDescriptor HOST_URL = new
PropertyDescriptor.Builder()
+ .name("host-url")
+ .displayName("Snowflake URL")
+ .description("Example host url:
[account-locator].[cloud-region].[cloud].snowflakecomputing.com")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.FULL_URL)
+ .build();
+
+ public static final PropertyDescriptor ACCOUNT_LOCATOR = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor CLOUD_REGION = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor CLOUD_TYPE = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor ORGANIZATION_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_NAME)
+ .build();
+
+ public static final PropertyDescriptor ACCOUNT_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+ .dependsOn(ACCOUNT_IDENTIFIER_FORMAT,
AccountIdentifierFormat.ACCOUNT_NAME)
+ .build();
+
+ public static final PropertyDescriptor USER_NAME = new
PropertyDescriptor.Builder()
+ .name("user-name")
+ .displayName("User Name")
+ .description("The Snowflake user name.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor PIPE_NAME = new
PropertyDescriptor.Builder()
+ .name("pipe-name")
+ .displayName("Pipe Name")
+ .description("The Snowflake pipe's name to ingest from.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor PRIVATE_KEY_SERVICE = new
PropertyDescriptor.Builder()
+ .name("private-key-service")
+ .displayName("Private Key Service")
+ .description("Specifies the Controller Service that will provide
the private key. The public key needs to be added to the user account in the
Snowflake account beforehand.")
+ .identifiesControllerService(PrivateKeyService.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor HOST_SCHEME = new
PropertyDescriptor.Builder()
+ .name("host-scheme")
+ .displayName("Host Scheme")
+ .description("The scheme of the host url to connect to.")
+ .allowableValues("http", "https")
+ .defaultValue("https")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor HOST_PORT = new
PropertyDescriptor.Builder()
+ .name("host-port")
+ .displayName("Host Port")
+ .description("The port of the host url to connect to.")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("443")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ ACCOUNT_IDENTIFIER_FORMAT,
+ HOST_URL,
+ ACCOUNT_LOCATOR,
+ CLOUD_REGION,
+ CLOUD_TYPE,
+ ORGANIZATION_NAME,
+ ACCOUNT_NAME,
+ USER_NAME,
+ PIPE_NAME,
+ PRIVATE_KEY_SERVICE,
Review Comment:
Please move `Private Key Service` up, just after the User Name because it is
the credential belonging to the user.
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java:
##########
@@ -55,34 +63,107 @@
description = "Snowflake JDBC driver property name prefixed with
'SENSITIVE.' handled as a sensitive property.")
})
@RequiresInstanceClassLoading
-public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool implements DBCPService {
+public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService {
+
+ public static final PropertyDescriptor CONNECTION_URL_FORMAT = new
PropertyDescriptor.Builder()
+ .name("connection-url-format")
+ .displayName("Connection URL Format")
+ .description("The format of the connection URL.")
+ .allowableValues(ConnectionUrlFormat.class)
+ .required(true)
+ .defaultValue(ConnectionUrlFormat.FULL_URL.getValue())
+ .build();
public static final PropertyDescriptor SNOWFLAKE_URL = new
PropertyDescriptor.Builder()
- .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
- .displayName("Snowflake URL")
- .description("Example connection string:
jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]"
+
- " The connection parameters can include db=DATABASE_NAME to avoid
using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
- .build();
+ .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
+ .displayName("Snowflake URL")
+ .description("Example connection string:
jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]"
+
+ " The connection parameters can include db=DATABASE_NAME
to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
+ .required(true)
+ .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.FULL_URL)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_LOCATOR = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+ .dependsOn(CONNECTION_URL_FORMAT,
ConnectionUrlFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_CLOUD_REGION = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+ .dependsOn(CONNECTION_URL_FORMAT,
ConnectionUrlFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_CLOUD_TYPE = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+ .dependsOn(CONNECTION_URL_FORMAT,
ConnectionUrlFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_ORGANIZATION_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+ .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+ .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+ .build();
public static final PropertyDescriptor SNOWFLAKE_USER = new
PropertyDescriptor.Builder()
- .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
- .displayName("Snowflake User")
- .description("The Snowflake user name")
- .build();
+ .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
+ .displayName("Snowflake User")
Review Comment:
I would consider removing the `Snowflake` prefix and using simply
`Username` / `Password` property names because the other properties (like
`Account Locator` or `Database`) do not use the prefix (correctly).
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "Staged file path")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Puts files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
+ + " This processor can be connected to an StartSnowflakeIngest
processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+ static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("snowflake-connection-provider")
+ .displayName("Snowflake Connection Provider")
+ .description("Specifies the Controller Service to use for creating
SQL connections to Snowflake.")
+
.identifiesControllerService(SnowflakeConnectionProviderService.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor INTERNAL_STAGE_NAME = new
PropertyDescriptor.Builder()
+ .name("internal-stage-name")
+ .displayName("Internal Stage Name")
+ .description("The name of the internal stage in the Snowflake
account to put files into.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .build();
Review Comment:
Similar to the `Pipe Name` in the
`StandardSnowflakeIngestManagerProviderService`, separate properties for
`Database` and `Schema` could be used here too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]