This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b60ef97c95 [Improve] filestore options (#8921)
b60ef97c95 is described below
commit b60ef97c95158d64628f83bfec479aa48ef2852a
Author: Jarvis <[email protected]>
AuthorDate: Fri Mar 7 19:21:30 2025 +0800
[Improve] filestore options (#8921)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
.../firestore/config/FirestoreParameters.java | 12 +++---
...estoreConfig.java => FirestoreSinkOptions.java} | 2 +-
.../google/firestore/sink/FirestoreSink.java | 48 ++++------------------
.../firestore/sink/FirestoreSinkFactory.java | 17 ++++++--
.../GoogleFirestoreIT.java | 8 ++--
6 files changed, 34 insertions(+), 54 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 90186e9a81..72b5b09c51 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -205,7 +205,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("SocketSinkOptions");
whiteList.add("SelectDBSinkOptions");
whiteList.add("PrometheusSinkOptions");
- whiteList.add("FirestoreSinkOptions");
whiteList.add("MilvusSinkOptions");
whiteList.add("RocketMqSourceOptions");
whiteList.add("TablestoreSinkOptions");
diff --git
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java
index bcc67a6244..21941165c3 100644
---
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java
+++
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreParameters.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.google.firestore.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Data;
@@ -32,11 +32,11 @@ public class FirestoreParameters implements Serializable {
private String collection;
- public FirestoreParameters buildWithConfig(Config config) {
- this.projectId = config.getString(FirestoreConfig.PROJECT_ID.key());
- this.collection = config.getString(FirestoreConfig.COLLECTION.key());
- if (config.hasPath(FirestoreConfig.CREDENTIALS.key())) {
- this.credentials =
config.getString(FirestoreConfig.CREDENTIALS.key());
+ public FirestoreParameters buildWithConfig(ReadonlyConfig config) {
+ this.projectId = config.get(FirestoreSinkOptions.PROJECT_ID);
+ this.collection = config.get(FirestoreSinkOptions.COLLECTION);
+ if (config.getOptional(FirestoreSinkOptions.CREDENTIALS).isPresent()) {
+ this.credentials = config.get(FirestoreSinkOptions.CREDENTIALS);
}
return this;
}
diff --git
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java
rename to
seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java
index eeb4556745..493456fc64 100644
---
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreConfig.java
+++
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/config/FirestoreSinkOptions.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.google.firestore.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class FirestoreConfig {
+public class FirestoreSinkOptions {
public static final Option<String> PROJECT_ID =
Options.key("project_id")
diff --git
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
index 6149ba9358..cd264f6eef 100644
---
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
+++
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
@@ -17,70 +17,40 @@
package org.apache.seatunnel.connectors.seatunnel.google.firestore.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreParameters;
-import
org.apache.seatunnel.connectors.seatunnel.google.firestore.exception.FirestoreConnectorException;
-
-import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION;
-import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID;
-
-@AutoService(SeaTunnelSink.class)
public class FirestoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private SeaTunnelRowType rowType;
+ private final CatalogTable catalogTable;
- private FirestoreParameters firestoreParameters;
+ private final FirestoreParameters firestoreParameters;
- @Override
- public String getPluginName() {
- return "GoogleFirestore";
+ public FirestoreSink(CatalogTable catalogTable, FirestoreParameters
firestoreParameters) {
+ this.catalogTable = catalogTable;
+ this.firestoreParameters = firestoreParameters;
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(pluginConfig, PROJECT_ID.key(),
COLLECTION.key());
- if (!result.isSuccess()) {
- throw new FirestoreConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- this.firestoreParameters = new
FirestoreParameters().buildWithConfig(pluginConfig);
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.rowType = seaTunnelRowType;
+ public String getPluginName() {
+ return "GoogleFirestore";
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new FirestoreSinkWriter(rowType, firestoreParameters);
+ return new FirestoreSinkWriter(catalogTable.getSeaTunnelRowType(),
firestoreParameters);
}
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.ofNullable(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java
index 0ee2120475..76b3496800 100644
---
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSinkFactory.java
@@ -18,14 +18,17 @@
package org.apache.seatunnel.connectors.seatunnel.google.firestore.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreParameters;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION;
-import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.CREDENTIALS;
-import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.COLLECTION;
+import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.CREDENTIALS;
+import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions.PROJECT_ID;
@AutoService(Factory.class)
public class FirestoreSinkFactory implements TableSinkFactory {
@@ -39,4 +42,12 @@ public class FirestoreSinkFactory implements
TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder().required(PROJECT_ID,
COLLECTION).optional(CREDENTIALS).build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () ->
+ new FirestoreSink(
+ context.getCatalogTable(),
+ new
FirestoreParameters().buildWithConfig(context.getOptions()));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java
index fe72624d19..2dad5db469 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-google-firestore-e2e/src/test/java/org.apache.seatunnel.e2e.connector.google.firestore/GoogleFirestoreIT.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.e2e.connector.google.firestore;
-import
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig;
+import
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreSinkOptions;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -86,9 +86,9 @@ public class GoogleFirestoreIT extends TestSuiteBase
implements TestResource {
File file = ContainerUtil.getResourcesFile(FIRESTORE_CONF_FILE);
Config config = ConfigFactory.parseFile(file);
Config firestoreConfig =
config.getConfig("sink").getConfig("GoogleFirestore");
- this.projectId =
firestoreConfig.getString(FirestoreConfig.PROJECT_ID.key());
- this.collection =
firestoreConfig.getString(FirestoreConfig.COLLECTION.key());
- this.credentials =
firestoreConfig.getString(FirestoreConfig.CREDENTIALS.key());
+ this.projectId =
firestoreConfig.getString(FirestoreSinkOptions.PROJECT_ID.key());
+ this.collection =
firestoreConfig.getString(FirestoreSinkOptions.COLLECTION.key());
+ this.credentials =
firestoreConfig.getString(FirestoreSinkOptions.CREDENTIALS.key());
}
@AfterAll