This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new bf38d2a3aa [INLONG-10560][Sort] Support bounded pulsar source (#10569)
bf38d2a3aa is described below
commit bf38d2a3aa1f5d8663fb4797ab9bad9ca787a7ab
Author: AloysZhang <[email protected]>
AuthorDate: Fri Jul 5 16:21:03 2024 +0800
[INLONG-10560][Sort] Support bounded pulsar source (#10569)
Co-authored-by: Aloys Zhang <[email protected]>
---
.../apache/inlong/common/bounded/Boundaries.java | 30 ++++++++++++
.../apache/inlong/common/bounded/BoundaryType.java | 56 ++++++++++++++++++++++
.../inlong/sort/configuration/Constants.java | 15 ++++++
.../inlong/sort/protocol/node/ExtractNode.java | 7 +++
.../protocol/node/extract/PulsarExtractNode.java | 29 +++++++++++
.../main/java/org/apache/inlong/sort/Entrance.java | 42 ++++++++++++++++
.../sort/pulsar/table/PulsarTableFactory.java | 4 ++
.../sort/pulsar/table/PulsarTableOptions.java | 28 +++++++++++
.../pulsar/table/source/PulsarTableSource.java | 23 +++++----
9 files changed, 225 insertions(+), 9 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/bounded/Boundaries.java
b/inlong-common/src/main/java/org/apache/inlong/common/bounded/Boundaries.java
new file mode 100644
index 0000000000..d3302ab99c
--- /dev/null
+++
b/inlong-common/src/main/java/org/apache/inlong/common/bounded/Boundaries.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.bounded;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Boundaries {
+
+ public String lowerBound;
+ public String upperBound;
+ public BoundaryType boundaryType;
+}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/bounded/BoundaryType.java
b/inlong-common/src/main/java/org/apache/inlong/common/bounded/BoundaryType.java
new file mode 100644
index 0000000000..1888bd3d91
--- /dev/null
+++
b/inlong-common/src/main/java/org/apache/inlong/common/bounded/BoundaryType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.bounded;
+
+import lombok.Getter;
+
+/**
+ * Source boundary types.
+ * TIME is the common boundary type should be supported in every bounded
source.
+ * OFFSET is the boundary for MQ type bounded source, like offset in kafka or
messageId in pulsar.
+ * */
+@Getter
+public enum BoundaryType {
+
+ TIME("time"),
+ OFFSET("offset");
+
+ private final String type;
+
+ BoundaryType(String boundaryType) {
+ this.type = boundaryType;
+ }
+
+ public static BoundaryType getInstance(String boundaryType) {
+ for (BoundaryType type : values()) {
+ if (type.getType().equalsIgnoreCase(boundaryType)) {
+ return type;
+ }
+ }
+ return null;
+ }
+
+ public static boolean isSupportBoundaryType(String boundaryType) {
+ for (BoundaryType source : values()) {
+ if (source.getType().equalsIgnoreCase(boundaryType)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index a5717242a5..878b841076 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -299,6 +299,21 @@ public class Constants {
.defaultValue("stream")
.withDescription("The runtime execution mode of Flink, including
stream and batch, default is stream");
+ // ------------------------------------------------------------------------
+ // Source boundary related
+ // ------------------------------------------------------------------------
+ public static final ConfigOption<String> SOURCE_BOUNDARY_TYPE =
key("source.boundary.type")
+ .defaultValue("time")
+ .withDescription("The type of source boundary");
+
+ public static final ConfigOption<String> SOURCE_LOWER_BOUNDARY =
key("source.lower.boundary")
+ .defaultValue("0")
+ .withDescription("The lower bound of source");
+
+ public static final ConfigOption<String> SOURCE_UPPER_BOUNDARY =
key("source.upper.boundary")
+ .defaultValue("0")
+ .withDescription("The upper bound of source");
+
// ------------------------------------------------------------------------
// Metrics related
// ------------------------------------------------------------------------
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index fc68f0f356..55e7b57aae 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.protocol.node;
+import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
@@ -106,4 +107,10 @@ public abstract class ExtractNode implements Node {
this.watermarkField = watermarkField;
this.properties = properties;
}
+
+ public void fillInBoundaries(Boundaries boundaries) {
+ Preconditions.checkNotNull(boundaries, "boundaries is null");
+ // every single kind of extract node should provide the way to fill in
boundaries individually
+ }
+
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index 9a2adcc8e3..ab90ba7d19 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.protocol.node.extract;
+import org.apache.inlong.common.bounded.Boundaries;
+import org.apache.inlong.common.bounded.BoundaryType;
import org.apache.inlong.common.enums.MetaField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
@@ -33,13 +35,17 @@ import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
@EqualsAndHashCode(callSuper = true)
@@ -47,6 +53,7 @@ import java.util.Set;
@Data
public class PulsarExtractNode extends ExtractNode implements InlongMetric,
Metadata {
+ private static final Logger log =
LoggerFactory.getLogger(PulsarExtractNode.class);
private static final long serialVersionUID = 1L;
@Nonnull
@@ -89,6 +96,8 @@ public class PulsarExtractNode extends ExtractNode implements
InlongMetric, Meta
@JsonProperty("clientAuthParams")
private String clientAuthParams;
+ Map<String, String> sourceBoundaryOptions = new HashMap<>();
+
@JsonCreator
public PulsarExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@@ -146,11 +155,17 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric, Meta
options.put("scan.startup.sub-name", scanStartupSubName);
options.put("scan.startup.sub-start-offset",
scanStartupSubStartOffset);
}
+
if (StringUtils.isNotBlank(clientAuthPluginClassName)
&& StringUtils.isNotBlank(clientAuthParams)) {
options.put("pulsar.client.authPluginClassName",
clientAuthPluginClassName);
options.put("pulsar.client.authParams", clientAuthParams);
}
+
+ // add boundary options
+ if (!sourceBoundaryOptions.isEmpty()) {
+ options.putAll(sourceBoundaryOptions);
+ }
return options;
}
@@ -197,4 +212,18 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric, Meta
return EnumSet.of(MetaField.AUDIT_DATA_TIME);
}
+ @Override
+ public void fillInBoundaries(Boundaries boundaries) {
+ super.fillInBoundaries(boundaries);
+ BoundaryType boundaryType = boundaries.getBoundaryType();
+ String lowerBoundary = boundaries.getLowerBound();
+ String upperBoundary = boundaries.getUpperBound();
+ if (Objects.requireNonNull(boundaryType) == BoundaryType.TIME) {
+ sourceBoundaryOptions.put("source.start.publish-time",
lowerBoundary);
+ sourceBoundaryOptions.put("source.stop.at-publish-time",
upperBoundary);
+ log.info("Filled in source boundaries options");
+ } else {
+ log.warn("Not supported boundary type: {}", boundaryType);
+ }
+ }
}
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index 4aa3902cae..a9744f793c 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort;
+import org.apache.inlong.common.bounded.Boundaries;
+import org.apache.inlong.common.bounded.BoundaryType;
import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.parser.Parser;
@@ -24,6 +26,7 @@ import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
import org.apache.inlong.sort.parser.impl.NativeFlinkSqlParser;
import org.apache.inlong.sort.parser.result.ParseResult;
import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.util.ParameterTool;
import com.google.common.base.Preconditions;
@@ -33,13 +36,20 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import static
org.apache.inlong.sort.configuration.Constants.SOURCE_BOUNDARY_TYPE;
+import static
org.apache.inlong.sort.configuration.Constants.SOURCE_LOWER_BOUNDARY;
+import static
org.apache.inlong.sort.configuration.Constants.SOURCE_UPPER_BOUNDARY;
+
public class Entrance {
+ private static final Logger log = LoggerFactory.getLogger(Entrance.class);
public static final String BATCH_MODE = "batch";
public static void main(String[] args) throws Exception {
@@ -75,6 +85,10 @@ public class Entrance {
groupInfo.getProperties().putIfAbsent(Constants.METRICS_AUDIT_PROXY_HOSTS.key(),
config.getString(Constants.METRICS_AUDIT_PROXY_HOSTS));
}
+
+ // fill in boundaries if needed
+ fillInSourceBoundariesIfNeeded(runtimeExecutionMode, groupInfo,
config);
+
parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
} else {
String statements = getStatementSetFromFile(sqlFile);
@@ -85,6 +99,34 @@ public class Entrance {
parseResult.execute();
}
+ private static void fillInSourceBoundariesIfNeeded(String
runtimeExecutionMode, GroupInfo groupInfo,
+ Configuration configuration) {
+ if (!BATCH_MODE.equalsIgnoreCase(runtimeExecutionMode)) {
+ return;
+ }
+ String type = configuration.getString(SOURCE_BOUNDARY_TYPE);
+ String lowerBoundary = configuration.getString(SOURCE_LOWER_BOUNDARY);
+ String upperBoundary = configuration.getString(SOURCE_UPPER_BOUNDARY);
+
+ log.info("Filling in source boundaries for group: {}, with execution
mode: {}, boundaryType: {}, "
+ + "lowerBoundary: {}, upperBoundary: {}",
+ groupInfo.getGroupId(), runtimeExecutionMode, type,
lowerBoundary, upperBoundary);
+
+ BoundaryType boundaryType = BoundaryType.getInstance(type);
+ if (boundaryType == null) {
+ throw new RuntimeException("Unknown boundary type: " + type);
+ }
+ Boundaries boundaries = new Boundaries(lowerBoundary, upperBoundary,
boundaryType);
+ // add source boundaries for bounded source
+ groupInfo.getStreams().forEach(streamInfo -> {
+ streamInfo.getNodes().forEach(node -> {
+ if (node instanceof ExtractNode) {
+ ((ExtractNode) node).fillInBoundaries(boundaries);
+ }
+ });
+ });
+ }
+
private static String getStatementSetFromFile(String fileName) throws
IOException {
return Files.asCharSource(new File(fileName),
StandardCharsets.UTF_8).read();
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
index 4b982f4691..deb240c7ed 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
@@ -81,11 +81,13 @@ import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_CUSTOM
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID_DEPRECATED;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME_DEPRECATED;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.STARTUP_MODE;
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
@@ -271,8 +273,10 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
ADMIN_URL,
STARTUP_MODE,
SOURCE_SUBSCRIPTION_NAME,
+ SOURCE_SUBSCRIPTION_NAME_DEPRECATED,
SOURCE_SUBSCRIPTION_TYPE,
SOURCE_START_FROM_MESSAGE_ID,
+ SOURCE_START_FROM_MESSAGE_ID_DEPRECATED,
SOURCE_START_FROM_PUBLISH_TIME,
SOURCE_STOP_AT_MESSAGE_ID,
SOURCE_STOP_AFTER_MESSAGE_ID,
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
index bc78dd2d98..93644fecfb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
@@ -79,6 +79,16 @@ public final class PulsarTableOptions {
* Copied because we want to have a default value for it.
*/
public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
+ ConfigOptions.key("source.subscription-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The subscription name of the
consumer that is used by the runtime [Pulsar DataStream source
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
This argument is required for constructing the consumer.")
+ .build());
+
+ public static final ConfigOption<String>
SOURCE_SUBSCRIPTION_NAME_DEPRECATED =
ConfigOptions.key("scan.startup.sub-name")
.stringType()
.noDefaultValue()
@@ -89,6 +99,24 @@ public final class PulsarTableOptions {
.build());
public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+ ConfigOptions.key("source.start.message-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "(Optional) Message id that is
used to specify a consuming starting "
+ + "point for source. Use
%s, %s or pass in a message id "
+ + "representation in %s, "
+ + "such as %s. This option
takes precedence over "
+ +
"source.start.publish-time.",
+ code("earliest"),
+ code("latest"),
+
code("ledgerId:entryId:partitionId"),
+ code("12:2:-1"))
+ .build());
+
+ public static final ConfigOption<String>
SOURCE_START_FROM_MESSAGE_ID_DEPRECATED =
ConfigOptions.key("scan.startup.sub-start-offset")
.stringType()
.noDefaultValue()
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
index 281e3eb5e9..8864ca45c0 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
@@ -19,8 +19,10 @@ package org.apache.inlong.sort.pulsar.table.source;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
import org.apache.flink.table.connector.ChangelogMode;
@@ -114,15 +116,18 @@ public class PulsarTableSource implements
ScanTableSource, SupportsReadingMetada
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
PulsarDeserializationSchema<RowData> deserializationSchema =
deserializationSchemaFactory.createPulsarDeserialization(context);
- PulsarSource<RowData> source =
- PulsarSource.builder()
- .setTopics(topics)
- .setStartCursor(startCursor)
- .setUnboundedStopCursor(stopCursor)
- .setDeserializationSchema(deserializationSchema)
- .setProperties(properties)
- .build();
- return SourceProvider.of(source);
+ PulsarSourceBuilder<RowData> sourceBuilder = PulsarSource.builder();
+ sourceBuilder
+ .setTopics(topics)
+ .setStartCursor(startCursor)
+ .setDeserializationSchema(deserializationSchema)
+ .setProperties(properties);
+ if (!(stopCursor instanceof NeverStopCursor)) {
+ sourceBuilder.setBoundedStopCursor(stopCursor);
+ } else {
+ sourceBuilder.setUnboundedStopCursor(stopCursor);
+ }
+ return SourceProvider.of(sourceBuilder.build());
}
/**