exceptionfactory commented on code in PR #6601:
URL: https://github.com/apache/nifi/pull/6601#discussion_r1014204753


##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various 
storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {

Review Comment:
   The general naming convention for Processors is verb-noun, so recommend 
renaming this to something like `UpdateDeltaLakeTable`.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various 
storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new 
AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", 
"Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new 
AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new 
PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files 
stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new 
PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has 
to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            
.addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new 
PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet 
files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new 
PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new 
PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet 
files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, 
path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")

Review Comment:
   Property Display Names should capitalize most words.
   ```suggestion
               .displayName("GCP Bucket URL")
   ```



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various 
storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new 
AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", 
"Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new 
AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new 
PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files 
stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new 
PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has 
to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            
.addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new 
PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet 
files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new 
PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new 
PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet 
files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, 
path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

Review Comment:
   This property could use the Resource References feature to describe support 
for a file path.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/service/DeltaLakeService.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.service;
+
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.RemoveFile;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapter;
+import org.apache.nifi.processors.deltalake.storage.StorageAdapterFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.STRUCTURE_JSON;
+
+public class DeltaLakeService {

Review Comment:
   Recommend defining and interface and a Standard implementation to clarify 
the public contract for this service class.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various 
storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new 
AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", 
"Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new 
AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new 
PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files 
stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new 
PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has 
to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            
.addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new 
PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet 
files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new 
PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new 
PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet 
files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, 
path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet 
files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new 
PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in 
json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = 
Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, 
STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, 
AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));

Review Comment:
   Recommend reformatting as one property per line for easier readability and 
maintenance.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various 
storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new 
AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", 
"Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new 
AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new 
PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files 
stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new 
PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has 
to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            
.addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new 
PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet 
files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new 
PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new 
PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet 
files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, 
path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet 
files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new 
PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in 
json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = 
Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, 
STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, 
AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(flowFile, "delta table update failed with 
error", e.getMessage());

Review Comment:
   In general, it is best to avoid including error messages as attributes. If 
there are specific types of exceptions that provide error codes, that could be 
useful. The exception stack trace should be logged as an error as 
@krisztina-zsihovszki noted.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various 
storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new 
AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", 
"Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new 
AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new 
PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files 
stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new 
PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has 
to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            
.addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new 
PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet 
files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new 
PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new 
PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet 
files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, 
path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet 
files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new 
PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in 
json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = 
Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, 
STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, 
AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.create();
+        try {
+            Map<String, String> updateResult = updateDeltaLake();
+            session.putAllAttributes(flowFile, updateResult);
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception e) {
+            session.putAttribute(flowFile, "delta table update failed with 
error", e.getMessage());
+            session.transfer(flowFile, REL_FAILED);
+            context.yield();

Review Comment:
   This yield is not necessary.



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various 
storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new 
AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", 
"Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new 
AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new 
PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files 
stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new 
PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has 
to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            
.addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new 
PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet 
files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new 
PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new 
PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet 
files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, 
path has to contain the filename")

Review Comment:
   ```suggestion
               .description("Local filesystem path to GCP account JSON keyfile, 
path has to contain the filename")
   ```



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/storage/GcpStorageAdapter.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.storage;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import io.delta.standalone.DeltaLog;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static 
org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_ACCOUNT_JSON_KEYFILE_PATH;
+import static 
org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_BUCKET;
+import static 
org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.GCP_PATH;
+
+public class GcpStorageAdapter implements StorageAdapter {
+
+    private FileSystem fileSystem;
+    private DeltaLog deltaLog;
+    private String dataPath;
+    private String engineInfo;
+
+    public GcpStorageAdapter(ProcessContext processorContext, String 
engineInfo) {
+        this.engineInfo = engineInfo;
+
+        String accountJsonKeyPath = 
processorContext.getProperty(GCP_ACCOUNT_JSON_KEYFILE_PATH).getValue();
+        URI gcpBucketUri = 
URI.create(processorContext.getProperty(GCP_BUCKET).getValue());
+        String gcpPath = processorContext.getProperty(GCP_PATH).getValue();
+
+        Configuration configuration = createConfiguration(accountJsonKeyPath);
+        fileSystem = new GoogleHadoopFileSystem();
+        try {
+            fileSystem.initialize(gcpBucketUri, configuration);
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   See above note on adding a message to the exception. In this case, an 
`UncheckedIOException` is a better fit than `RuntimeException`:
   ```suggestion
               throw new UncheckedIOException(String.format("GCP Filesystem 
Bucket URI [%s] initialization failed", gcpBucketUri), e);
   ```



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/storage/LocalStorageAdapter.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake.storage;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.processor.ProcessContext;
+
+import java.io.IOException;
+
+import static 
org.apache.nifi.processors.deltalake.DeltaLakeMetadataWriter.LOCAL_PATH;
+
+public class LocalStorageAdapter implements StorageAdapter {
+
+    private FileSystem fileSystem;
+    private DeltaLog deltaLog;
+    private String dataPath;
+    private String engineInfo;
+
+    public LocalStorageAdapter(ProcessContext processorContext, String 
engineInfo) {
+        this.engineInfo = engineInfo;
+
+        dataPath = processorContext.getProperty(LOCAL_PATH).getValue();
+        Path source = new Path(dataPath);
+
+        Configuration configuration = new Configuration();
+        try {
+            fileSystem = source.getFileSystem(configuration);
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   ```suggestion
               throw new UncheckedIOException(String.format("Local Storage FS 
[%s] configuration failed", dataPath), e);
   ```



##########
nifi-nar-bundles/nifi-deltalake-bundle/nifi-deltalake-processors/src/main/java/org/apache/nifi/processors/deltalake/DeltaLakeMetadataWriter.java:
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deltalake;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.deltalake.service.DeltaLakeService;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@TriggerSerially
+@Tags({"deltalake", "deltatable", "cloud", "storage", "parquet", "writer"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Creates or updates a Delta table. Supports various 
storage solutions.")
+public class DeltaLakeMetadataWriter extends AbstractProcessor {
+
+    public static final AllowableValue LOCAL_FILESYSTEM = new 
AllowableValue("LOCAL", "Local Filesystem",
+            "The parquet files stored on the local filesystem.");
+
+    public static final AllowableValue AMAZON_S3 = new AllowableValue("S3", 
"Amazon S3",
+            "The parquet files stored in a AWS S3 bucket.");
+
+    public static final AllowableValue MICROSOFT_AZURE = new 
AllowableValue("AZURE", "Microsoft Azure",
+            "The parquet files stored in a Microsoft Azure blob.");
+
+    public static final AllowableValue GCP = new AllowableValue("GCP", "GCP",
+            "The parquet files stored in a GCP bucket.");
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    public static final PropertyDescriptor STORAGE_SELECTOR = new 
PropertyDescriptor.Builder()
+            .name("storage-location")
+            .displayName("Storage Location")
+            .description("Choose storage provider where the parquet files 
stored")
+            .allowableValues(LOCAL_FILESYSTEM, AMAZON_S3, MICROSOFT_AZURE, GCP)
+            .defaultValue(LOCAL_FILESYSTEM.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor LOCAL_PATH = new 
PropertyDescriptor.Builder()
+            .name("local-path")
+            .displayName("Local filesystem path")
+            .description("Path on the local file system, can be absolute(has 
to start with '/') or relative path")
+            .dependsOn(STORAGE_SELECTOR, LOCAL_FILESYSTEM)
+            
.addValidator(StandardValidators.createDirectoryExistsValidator(false, false))
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-access-key")
+            .displayName("S3 Access Key")
+            .description("The access key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("s3-secret-key")
+            .displayName("S3 Secret Key")
+            .description("The secret key for Amazon S3")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor S3_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("s3-bucket-url")
+            .displayName("S3 bucket url")
+            .description("The bucket url in S3, has to start with s3a://")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor S3_PATH = new 
PropertyDescriptor.Builder()
+            .name("s3-data-path")
+            .displayName("Data path in S3 bucket")
+            .description("The path to the directory containing the parquet 
files within the S3 bucket")
+            .dependsOn(STORAGE_SELECTOR, AMAZON_S3)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_ACCOUNT_KEY = new 
PropertyDescriptor.Builder()
+            .name("azure-account-key")
+            .displayName("Azure Account Key")
+            .description("The account key for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_NAME = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-name")
+            .displayName("Azure blob storage name")
+            .description("The storage name of the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_STORAGE_ACCOUNT = new 
PropertyDescriptor.Builder()
+            .name("azure-storage-account")
+            .displayName("Azure blob storage account")
+            .description("The storage account for the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor AZURE_PATH = new 
PropertyDescriptor.Builder()
+            .name("azure-data-path")
+            .displayName("Data path in Azure blob")
+            .description("The path to the directory containing the parquet 
files within the Azure blob")
+            .dependsOn(STORAGE_SELECTOR, MICROSOFT_AZURE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_ACCOUNT_JSON_KEYFILE_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-keyfile-path")
+            .displayName("GCP account json keyfile path")
+            .description("Local filesystem path to GCP account json keyfile, 
path has to contain the filename")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_BUCKET = new 
PropertyDescriptor.Builder()
+            .name("gcp-bucket-url")
+            .displayName("GCP bucket url")
+            .description("The GCP bucket url, has to starts with gs://")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor GCP_PATH = new 
PropertyDescriptor.Builder()
+            .name("gcp-data-path")
+            .displayName("Data path in GCP bucket")
+            .description("The path to the directory containing the parquet 
files within the GCP bucket")
+            .dependsOn(STORAGE_SELECTOR, GCP)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor STRUCTURE_JSON = new 
PropertyDescriptor.Builder()
+            .name("parquet-structure")
+            .displayName("Parquet structure in json")
+            .description("Describes the data structure of the parquet file in 
json format")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("DeltaLake table successfully updated")
+            .build();
+
+    public static final Relationship REL_FAILED = new Relationship.Builder()
+            .name("failure")
+            .description("DeltaLake table update failed")
+            .build();
+
+    private DeltaLakeService deltalakeService;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        this.relationships = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILED)));
+        this.descriptors = 
Collections.unmodifiableList(Arrays.asList(STORAGE_SELECTOR, LOCAL_PATH, 
STRUCTURE_JSON,
+                S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET, S3_PATH,
+                AZURE_ACCOUNT_KEY, AZURE_STORAGE_NAME, AZURE_STORAGE_ACCOUNT, 
AZURE_PATH,
+                GCP_ACCOUNT_JSON_KEYFILE_PATH, GCP_BUCKET, GCP_PATH));
+        deltalakeService = new DeltaLakeService();

Review Comment:
   This should be removed to a method annotated with `OnScheduled`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to