Lehel44 commented on code in PR #6584:
URL: https://github.com/apache/nifi/pull/6584#discussion_r1014298984
##########
nifi-nar-bundles/nifi-extension-utils/nifi-dbcp-base/src/main/java/org/apache/nifi/dbcp/AbstractDBCPConnectionPool.java:
##########
@@ -496,6 +499,15 @@ protected Driver getDriver(final String driverName, final
String url) {
}
}
+ /**
+ * Override in subclasses to provide connection properties to the data
source
+ *
+ * @return Key-value pairs that will be added as connection properties
+ */
+ protected Map<String, String> getConnectionProperties(final
ConfigurationContext context) {
Review Comment:
Can this method be abstract to force overriding?
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+ description = "The 'begin mark' from the response of a history request
is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description =
"The path to the file in the stage")
Review Comment:
Is "in the stage" anything different from being staged? Can it just be e.g.
staged file path?
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
+ + " This processor can be connected to an StartSnowflakeIngest
processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+ static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("snowflake-connection-provider")
+ .displayName("Snowflake Connection Provider")
+ .description("Specifies the Controller Service to use for creating
SQL connections to Snowflake.")
+
.identifiesControllerService(SnowflakeConnectionProviderService.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor INTERNAL_STAGE_NAME = new
PropertyDescriptor.Builder()
+ .name("internal-stage-name")
+ .displayName("Internal Stage Name")
+ .description("The name of the internal stage in the Snowflake
account to put files into.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
Review Comment:
Why not FlowFile scope?
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
+ + " This processor can be connected to an StartSnowflakeIngest
processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+ static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("snowflake-connection-provider")
+ .displayName("Snowflake Connection Provider")
+ .description("Specifies the Controller Service to use for creating
SQL connections to Snowflake.")
+
.identifiesControllerService(SnowflakeConnectionProviderService.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor INTERNAL_STAGE_NAME = new
PropertyDescriptor.Builder()
+ .name("internal-stage-name")
+ .displayName("Internal Stage Name")
+ .description("The name of the internal stage in the Snowflake
account to put files into.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("For FlowFiles of successful PUT operation")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("For FlowFiles of failed PUT operation")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ SNOWFLAKE_CONNECTION_PROVIDER,
+ INTERNAL_STAGE_NAME
+ ));
+
+ static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
Review Comment:
```suggestion
static final Set<Relationship> RELATIONSHIPS = Stream.of(REL_SUCCESS,
REL_FAILURE)
.collect(collectingAndThen(toSet(),
Collections::unmodifiableSet));
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
Review Comment:
```suggestion
@CapabilityDescription("Puts files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus/additionalDetails.html:
##########
@@ -0,0 +1,48 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>GetSnowflakeIngestStatus</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ <style>
+ h2 {margin-top: 4em}
+ h3 {margin-top: 3em}
+ td {text-align: left}
+ </style>
+</head>
+<body>
+
+<h1>GetSnowflakeIngestStatus</h1>
+
+<h3>Description</h3>
+<p>
+ The GetSnowflakeIngestStatus processor can be used to get the status of a
staged file ingested by a Snowflake pipe.
+ To wait until a staged file is fully ingested (copied into the table) you
should connect this processor's "retry" relationship to itself.
+ The processor requires an upstream connection that provides the path of
the staged file to be checked through the "snowflake.staged.file.path"
attribute.
+ See StartSnowflakeIngest processor for details about how to properly set
up a flow to ingest staged files.
+ <b>
+ NOTE: Snowflake pipes caches the ingested files' path and never
ingests the same file multiple times.
Review Comment:
```suggestion
NOTE: Snowflake pipes cache the paths of ingested files and never
ingest the same file again/multiple times.
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus/additionalDetails.html:
##########
@@ -0,0 +1,48 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>GetSnowflakeIngestStatus</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ <style>
+ h2 {margin-top: 4em}
+ h3 {margin-top: 3em}
+ td {text-align: left}
+ </style>
+</head>
+<body>
+
+<h1>GetSnowflakeIngestStatus</h1>
+
+<h3>Description</h3>
+<p>
+ The GetSnowflakeIngestStatus processor can be used to get the status of a
staged file ingested by a Snowflake pipe.
+ To wait until a staged file is fully ingested (copied into the table) you
should connect this processor's "retry" relationship to itself.
+ The processor requires an upstream connection that provides the path of
the staged file to be checked through the "snowflake.staged.file.path"
attribute.
+ See StartSnowflakeIngest processor for details about how to properly set
up a flow to ingest staged files.
+ <b>
+ NOTE: Snowflake pipes caches the ingested files' path and never
ingests the same file multiple times.
+ This can lead to the processor being stuck in an "infinite loop" with
a FlowFile that has the same "snowflake.staged.file.path" as a stage file that
has been
+ already ingested by the pipe. It's recommended to set up the retry
mechanism in a way that avoids these situations.
Review Comment:
```suggestion
This can cause the processor to enter an "infinite loop" with a
FlowFile that has the same "snowflake.staged.file.path" attribute as a stage
file that has previously been ingested by the pipe. It is recommended that the
retry mechanism be configured to avoid these scenarios.
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html:
##########
@@ -0,0 +1,52 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>StartSnowflakeIngest</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ <style>
+ h2 {margin-top: 4em}
+ h3 {margin-top: 3em}
+ td {text-align: left}
+ </style>
+</head>
+<body>
+
+<h1>StartSnowflakeIngest</h1>
+
+<h3>Description</h3>
+<p>
+ The StartSnowflakeIngest processor triggers a Snowflake pipe ingestion for
a staged file. The pipe has to be set up in your Snowflake account.
+ The processor requires an upstream connection that provides the path of
the file to be ingested in the stage through the "snowflake.staged.file.path"
attribute.
+ This attribute is automatically filled in by the PutSnowflakeInternalStage
when you are using an internal stage.
Review Comment:
```suggestion
This attribute is automatically filled in by the
PutSnowflakeInternalStage when using an internal stage.
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/common/Attributes.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake.common;
+
+public class Attributes {
Review Comment:
Could you please make this class final and add a private constructor to
prevent it from being extended and instantiated?
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html:
##########
@@ -0,0 +1,52 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>StartSnowflakeIngest</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ <style>
+ h2 {margin-top: 4em}
+ h3 {margin-top: 3em}
+ td {text-align: left}
+ </style>
+</head>
+<body>
+
+<h1>StartSnowflakeIngest</h1>
+
+<h3>Description</h3>
+<p>
+ The StartSnowflakeIngest processor triggers a Snowflake pipe ingestion for
a staged file. The pipe has to be set up in your Snowflake account.
+ The processor requires an upstream connection that provides the path of
the file to be ingested in the stage through the "snowflake.staged.file.path"
attribute.
+ This attribute is automatically filled in by the PutSnowflakeInternalStage
when you are using an internal stage.
+ In case your pipe copies data from an external stage, you need to provide
this attribute yourself (e.g. with an UpdateAttribute processor).
Review Comment:
```suggestion
In case a pipe copies data from an external stage, the attribute shall
be manually provided (e.g. with an UpdateAttribute processor).
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.StartSnowflakeIngest/additionalDetails.html:
##########
@@ -0,0 +1,52 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>StartSnowflakeIngest</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ <style>
+ h2 {margin-top: 4em}
+ h3 {margin-top: 3em}
+ td {text-align: left}
+ </style>
+</head>
+<body>
+
+<h1>StartSnowflakeIngest</h1>
+
+<h3>Description</h3>
+<p>
+ The StartSnowflakeIngest processor triggers a Snowflake pipe ingestion for
a staged file. The pipe has to be set up in your Snowflake account.
+ The processor requires an upstream connection that provides the path of
the file to be ingested in the stage through the "snowflake.staged.file.path"
attribute.
+ This attribute is automatically filled in by the PutSnowflakeInternalStage
when you are using an internal stage.
+ In case your pipe copies data from an external stage, you need to provide
this attribute yourself (e.g. with an UpdateAttribute processor).
+ <b>
+ NOTE: A Snowflake pipe doesn't ingest a file synchronously, so this
processor transfers a FlowFile to the "success" relationship when the file is
marked for ingestion.
+ You need to wait for the actual result of the ingestion through other
measures. E.g. you can connect this processor to a downstream
GetSnowflakeIngestStatus processor to wait for the results.
Review Comment:
```suggestion
NOTE: Since Snowflake pipes ingest files asynchronously, this
processor transfers FlowFiles to the "success" relationship when they're marked
for ingestion.
In order to wait for the actual result of the ingestion, the
processor may be connected to a downstream GetSnowflakeIngestStatus processor.
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html:
##########
@@ -0,0 +1,42 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>PutSnowflakeInternalStage</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ <style>
+ h2 {margin-top: 4em}
+ h3 {margin-top: 3em}
+ td {text-align: left}
+ </style>
+</head>
+<body>
+
+<h1>PutSnowflakeInternalStage</h1>
+
+<h3>Description</h3>
+<p>
+ The PutSnowflakeInternalStage processor can upload a file to a Snowflake
internal stage. This stage needs to be set up in your Snowflake account.
Review Comment:
We may provide a link from Snowflake docs for setting up a stage. Can it be
any type of internal stage?
https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
+ + " This processor can be connected to an StartSnowflakeIngest
processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+ static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("snowflake-connection-provider")
+ .displayName("Snowflake Connection Provider")
+ .description("Specifies the Controller Service to use for creating
SQL connections to Snowflake.")
+
.identifiesControllerService(SnowflakeConnectionProviderService.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor INTERNAL_STAGE_NAME = new
PropertyDescriptor.Builder()
+ .name("internal-stage-name")
+ .displayName("Internal Stage Name")
+ .description("The name of the internal stage in the Snowflake
account to put files into.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("For FlowFiles of successful PUT operation")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("For FlowFiles of failed PUT operation")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ SNOWFLAKE_CONNECTION_PROVIDER,
+ INTERNAL_STAGE_NAME
+ ));
+
+ static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final String internalStageName =
context.getProperty(INTERNAL_STAGE_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final SnowflakeConnectionProviderService connectionProviderService =
+ context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
+
.asControllerService(SnowflakeConnectionProviderService.class);
+
+ FlowFile flowFile = session.get();
+ final String fileName =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final String relativePath =
flowFile.getAttribute(CoreAttributes.PATH.key());
+ final String stageRelativePath = "./".equals(relativePath)
+ ? ""
+ : relativePath;
+ try (final InputStream inputStream = session.read(flowFile);
+ final SnowflakeConnectionWrapper snowflakeConnection =
connectionProviderService.getSnowflakeConnection()) {
+ snowflakeConnection.unwrap()
+ .uploadStream(internalStageName, stageRelativePath,
inputStream, fileName, false);
+ } catch (SQLException e) {
+ getLogger().error("Failed to upload flow content to internal
Snowflake stage", e);
Review Comment:
```suggestion
getLogger().error("Failed to upload flowfile content to internal
Snowflake stage", e);
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/resources/docs/org.apache.nifi.processors.snowflake.PutSnowflakeInternalStage/additionalDetails.html:
##########
@@ -0,0 +1,42 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>PutSnowflakeInternalStage</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ <style>
+ h2 {margin-top: 4em}
+ h3 {margin-top: 3em}
+ td {text-align: left}
+ </style>
+</head>
+<body>
+
+<h1>PutSnowflakeInternalStage</h1>
+
+<h3>Description</h3>
+<p>
+ The PutSnowflakeInternalStage processor can upload a file to a Snowflake
internal stage. This stage needs to be set up in your Snowflake account.
+ The processor requires an upstream connection and the incoming FlowFiles'
content will be uploaded to the stage. The attributes "filename" and "path" are
used to provide a prefix and file name for your file in the stage.
+ While the processor can be used by itself, it's usually recommended to
connect it to a StartSnowflakeIngest processor to put the uploaded file into
your Snowflake table via a pipe.
Review Comment:
```suggestion
While the processor may be used separately, it's recommended to connect
it to a StartSnowflakeIngest processor so that the uploaded file can be piped
into your Snowflake table.
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/StartSnowflakeIngest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.utils.StagedFileWrapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description =
"The path to the file in the stage")
+})
+@Tags({"snowflake", "snowpipe", "ingest"})
+@CapabilityDescription("Ingest files in a Snowflake stage. The stage must be
created in the Snowflake account beforehand."
Review Comment:
```suggestion
@CapabilityDescription("Ingests files in a Snowflake stage. The stage must
be created in the Snowflake account beforehand."
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
+ + " This processor can be connected to an StartSnowflakeIngest
processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+ static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("snowflake-connection-provider")
+ .displayName("Snowflake Connection Provider")
+ .description("Specifies the Controller Service to use for creating
SQL connections to Snowflake.")
+
.identifiesControllerService(SnowflakeConnectionProviderService.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor INTERNAL_STAGE_NAME = new
PropertyDescriptor.Builder()
+ .name("internal-stage-name")
+ .displayName("Internal Stage Name")
+ .description("The name of the internal stage in the Snowflake
account to put files into.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("For FlowFiles of successful PUT operation")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("For FlowFiles of failed PUT operation")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ SNOWFLAKE_CONNECTION_PROVIDER,
+ INTERNAL_STAGE_NAME
+ ));
+
+ static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final String internalStageName =
context.getProperty(INTERNAL_STAGE_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final SnowflakeConnectionProviderService connectionProviderService =
+ context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
+
.asControllerService(SnowflakeConnectionProviderService.class);
+
+ FlowFile flowFile = session.get();
Review Comment:
I see this pattern for checking the incoming flowfile all around NiFi
processors. It may worth considering.
`if (flowFile == null) {
return;
}`
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/GetSnowflakeIngestStatus.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.HistoryResponse;
+import net.snowflake.ingest.connection.HistoryResponse.FileEntry;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Stateful(scopes = Scope.CLUSTER,
+ description = "The 'begin mark' from the response of a history request
is stored to keep track of already requested history time range.")
+@DefaultSettings(penaltyDuration = "5 sec")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH, description =
"The path to the file in the stage")
+})
+@Tags({"snowflake", "snowpipe", "ingest", "history"})
+@CapabilityDescription("Waits until a file in a Snowflake stage is ingested.
The stage must be created in the Snowflake account beforehand."
+ + " This processor is usually connected to an upstream
StartSnowflakeIngest processor to make sure that the file is ingested")
+@SeeAlso({StartSnowflakeIngest.class, PutSnowflakeInternalStage.class})
+public class GetSnowflakeIngestStatus extends AbstractProcessor {
+
+ static final PropertyDescriptor INGEST_MANAGER_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("ingest-manager-provider")
+ .displayName("Ingest Manager Provider")
+ .description("Specifies the Controller Service to use for
ingesting Snowflake staged files.")
+
.identifiesControllerService(SnowflakeIngestManagerProviderService.class)
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("For FlowFiles of successful ingestion")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("For FlowFiles of failed ingestion")
+ .build();
+
+ static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("For FlowFiles whose file is still not ingested.
These FlowFiles should be routed back to this processor to try again later")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTIES =
Collections.singletonList(
+ INGEST_MANAGER_PROVIDER
+ );
+
+ static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_RETRY);
+ relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
Review Comment:
```suggestion
private static final Set<Relationship> RELATIONSHIPS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_RETRY,
REL_FAILURE
)));
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors/src/main/java/org/apache/nifi/processors/snowflake/PutSnowflakeInternalStage.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.snowflake;
+
+import static
org.apache.nifi.processors.snowflake.common.Attributes.ATTRIBUTE_STAGED_FILE_PATH;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "filename", description = "The name of the
staged file in the internal stage"),
+ @ReadsAttribute(attribute = "path", description = "The relative path
to the staged file in the internal stage")
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = ATTRIBUTE_STAGED_FILE_PATH,
+ description = "The path to the file in the internal stage")
+})
+@Tags({"snowflake", "jdbc", "database", "connection"})
+@CapabilityDescription("Put files into a Snowflake internal stage. The
internal stage must be created in the Snowflake account beforehand."
+ + " This processor can be connected to an StartSnowflakeIngest
processor to ingest the file in the internal stage")
+@SeeAlso({StartSnowflakeIngest.class, GetSnowflakeIngestStatus.class})
+public class PutSnowflakeInternalStage extends AbstractProcessor {
+
+ static final PropertyDescriptor SNOWFLAKE_CONNECTION_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("snowflake-connection-provider")
+ .displayName("Snowflake Connection Provider")
+ .description("Specifies the Controller Service to use for creating
SQL connections to Snowflake.")
+
.identifiesControllerService(SnowflakeConnectionProviderService.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor INTERNAL_STAGE_NAME = new
PropertyDescriptor.Builder()
+ .name("internal-stage-name")
+ .displayName("Internal Stage Name")
+ .description("The name of the internal stage in the Snowflake
account to put files into.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("For FlowFiles of successful PUT operation")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("For FlowFiles of failed PUT operation")
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ SNOWFLAKE_CONNECTION_PROVIDER,
+ INTERNAL_STAGE_NAME
+ ));
+
+ static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final String internalStageName =
context.getProperty(INTERNAL_STAGE_NAME)
+ .evaluateAttributeExpressions()
+ .getValue();
+ final SnowflakeConnectionProviderService connectionProviderService =
+ context.getProperty(SNOWFLAKE_CONNECTION_PROVIDER)
+
.asControllerService(SnowflakeConnectionProviderService.class);
+
+ FlowFile flowFile = session.get();
+ final String fileName =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final String relativePath =
flowFile.getAttribute(CoreAttributes.PATH.key());
+ final String stageRelativePath = "./".equals(relativePath)
+ ? ""
+ : relativePath;
+ try (final InputStream inputStream = session.read(flowFile);
+ final SnowflakeConnectionWrapper snowflakeConnection =
connectionProviderService.getSnowflakeConnection()) {
+ snowflakeConnection.unwrap()
+ .uploadStream(internalStageName, stageRelativePath,
inputStream, fileName, false);
+ } catch (SQLException e) {
+ getLogger().error("Failed to upload flow content to internal
Snowflake stage", e);
Review Comment:
Can we include the internal stage name and filename to the error message?
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java:
##########
@@ -55,34 +64,107 @@
description = "Snowflake JDBC driver property name prefixed with
'SENSITIVE.' handled as a sensitive property.")
})
@RequiresInstanceClassLoading
-public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool implements DBCPService {
+public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService,
DBCPService {
Review Comment:
AbstractDBCPConnectionPool is abstract and already implements DBCPService
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/ConnectionUrlFormat.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_LOCATOR;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ACCOUNT_NAME;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_REGION;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_CLOUD_TYPE;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_ORGANIZATION_NAME;
+import static
org.apache.nifi.snowflake.service.SnowflakeComputingConnectionPool.SNOWFLAKE_URL;
+
+import java.util.stream.Stream;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.controller.ConfigurationContext;
+
+public enum ConnectionUrlFormat implements DescribedValue {
+ FULL_URL("full-url", "Full URL", "Provide connection URL in a single
property") {
+ @Override
+ public String buildConnectionUrl(final ConfigurationContext context) {
+ String snowflakeUrl =
context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue();
+ if (!snowflakeUrl.startsWith("jdbc:snowflake")) {
Review Comment:
Can "jdbc:snowflake" be extracted to Attributes as SNOWFLAKE_JDBC_PREFIX to
avoid duplication?
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/util/CommonProperties.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.snowflake.service.util;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public class CommonProperties {
Review Comment:
Would you please make the class final and add a private constructor to it?
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowflakeComputingConnectionPool.java:
##########
@@ -55,34 +64,107 @@
description = "Snowflake JDBC driver property name prefixed with
'SENSITIVE.' handled as a sensitive property.")
})
@RequiresInstanceClassLoading
-public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool implements DBCPService {
+public class SnowflakeComputingConnectionPool extends
AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService,
DBCPService {
+
+ public static final PropertyDescriptor CONNECTION_URL_FORMAT = new
PropertyDescriptor.Builder()
+ .name("connection-url-format")
+ .displayName("Connection URL Format")
+ .description("The format of the connection URL.")
+ .allowableValues(ConnectionUrlFormat.class)
+ .required(true)
+ .defaultValue(ConnectionUrlFormat.FULL_URL.getValue())
+ .build();
public static final PropertyDescriptor SNOWFLAKE_URL = new
PropertyDescriptor.Builder()
- .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
- .displayName("Snowflake URL")
- .description("Example connection string:
jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]"
+
- " The connection parameters can include db=DATABASE_NAME to avoid
using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
- .build();
+ .fromPropertyDescriptor(AbstractDBCPConnectionPool.DATABASE_URL)
+ .displayName("Snowflake URL")
+ .description("Example connection string:
jdbc:snowflake://[account].[region].snowflakecomputing.com/?[connection_params]"
+
+ " The connection parameters can include db=DATABASE_NAME
to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
+ .required(true)
+ .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.FULL_URL)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_LOCATOR = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_LOCATOR)
+ .dependsOn(CONNECTION_URL_FORMAT,
ConnectionUrlFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_CLOUD_REGION = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_REGION)
+ .dependsOn(CONNECTION_URL_FORMAT,
ConnectionUrlFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_CLOUD_TYPE = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.CLOUD_TYPE)
+ .dependsOn(CONNECTION_URL_FORMAT,
ConnectionUrlFormat.ACCOUNT_LOCATOR)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_ORGANIZATION_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ORGANIZATION_NAME)
+ .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_NAME = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CommonProperties.ACCOUNT_NAME)
+ .dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
+ .build();
public static final PropertyDescriptor SNOWFLAKE_USER = new
PropertyDescriptor.Builder()
- .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
- .displayName("Snowflake User")
- .description("The Snowflake user name")
- .build();
+ .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_USER)
+ .displayName("Snowflake User")
+ .description("The Snowflake user name.")
+ .required(true)
+ .build();
public static final PropertyDescriptor SNOWFLAKE_PASSWORD = new
PropertyDescriptor.Builder()
- .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD)
- .displayName("Snowflake Password")
- .description("The password for the Snowflake user")
- .build();
+ .fromPropertyDescriptor(AbstractDBCPConnectionPool.DB_PASSWORD)
+ .displayName("Snowflake Password")
+ .description("The password for the Snowflake user.")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_DATABASE = new
PropertyDescriptor.Builder()
+ .name("database")
+ .displayName("Database")
+ .description("The database to use by default. The same as passing
'db=DATABASE_NAME' to the connection string.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_SCHEMA = new
PropertyDescriptor.Builder()
+ .name("schema")
+ .displayName("Schema")
+ .description("The schema to use by default. The same as passing
'schema=SCHEMA' to the connection string.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("PUBLIC")
+ .build();
+
+ public static final PropertyDescriptor SNOWFLAKE_WAREHOUSE = new
PropertyDescriptor.Builder()
+ .name("warehouse")
+ .displayName("Warehouse")
+ .description("The warehouse to use by default. The same as passing
'warehouse=WAREHOUSE' to the connection string.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
private static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(CONNECTION_URL_FORMAT);
props.add(SNOWFLAKE_URL);
+ props.add(SNOWFLAKE_ACCOUNT_LOCATOR);
+ props.add(SNOWFLAKE_CLOUD_REGION);
+ props.add(SNOWFLAKE_CLOUD_TYPE);
+ props.add(SNOWFLAKE_ORGANIZATION_NAME);
+ props.add(SNOWFLAKE_ACCOUNT_NAME);
props.add(SNOWFLAKE_USER);
props.add(SNOWFLAKE_PASSWORD);
+ props.add(SNOWFLAKE_DATABASE);
+ props.add(SNOWFLAKE_SCHEMA);
+ props.add(SNOWFLAKE_WAREHOUSE);
+ props.add(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE);
props.add(VALIDATION_QUERY);
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
Review Comment:
```suggestion
private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
CONNECTION_URL_FORMAT,
SNOWFLAKE_URL,
SNOWFLAKE_ACCOUNT_LOCATOR,
SNOWFLAKE_CLOUD_REGION,
SNOWFLAKE_CLOUD_TYPE,
SNOWFLAKE_ORGANIZATION_NAME,
SNOWFLAKE_ACCOUNT_NAME,
SNOWFLAKE_USER,
SNOWFLAKE_PASSWORD,
SNOWFLAKE_DATABASE,
SNOWFLAKE_SCHEMA,
SNOWFLAKE_WAREHOUSE,
ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
VALIDATION_QUERY,
MAX_WAIT_TIME,
MAX_TOTAL_CONNECTIONS,
MIN_IDLE,
MAX_IDLE,
MAX_CONN_LIFETIME,
EVICTION_RUN_PERIOD,
MIN_EVICTABLE_IDLE_TIME,
SOFT_MIN_EVICTABLE_IDLE_TIME
));
```
##########
nifi-nar-bundles/nifi-snowflake-bundle/nifi-snowflake-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,5 @@
+nifi-snowflake-processors-nar
+Copyright 2015-2020 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
Review Comment:
Is this one empty on purpose?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]