This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8e83cee4e [flink] Supports infer parallelism (#584)
8e83cee4e is described below
commit 8e83cee4e4a68ee8198229420599eadd4eaf2bb8
Author: JunZhang <[email protected]>
AuthorDate: Sun Apr 2 15:16:51 2023 +0800
[flink] Supports infer parallelism (#584)
---
.../generated/flink_connector_configuration.html | 8 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 11 +-
.../flink/source/CompactorSourceBuilder.java | 3 +
.../paimon/flink/source/DataTableSource.java | 43 +++++++-
.../source/FileStoreSourceSplitGenerator.java | 7 ++
.../paimon/flink/source/FlinkSourceBuilder.java | 11 +-
.../paimon/flink/source/StaticFileStoreSource.java | 27 +++--
.../paimon/flink/source/SystemTableSource.java | 6 +-
.../apache/paimon/flink/ReadWriteTableITCase.java | 122 +++++++++++++++++++++
.../paimon/flink/util/ReadWriteTableTestUtil.java | 24 +++-
10 files changed, 244 insertions(+), 18 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 8060955d2..11cfa050b 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -30,7 +30,13 @@
<td><h5>scan.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
- <td>Define a custom parallelism for the scan source. By default,
if this option is not defined, the planner will derive the parallelism for each
statement individually by also considering the global configuration.</td>
+ <td>Define a custom parallelism for the scan source. By default,
if this option is not defined, the planner will derive the parallelism for each
statement individually by also considering the global configuration. If user
enable the scan.infer-parallelism, the planner will derive the parallelism by
inferred parallelism.</td>
+ </tr>
+ <tr>
+ <td><h5>scan.infer-parallelism</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If it is false, parallelism of source are set by
scan.parallelism. Otherwise, source parallelism is inferred from splits number
(batch mode) or bucket number(streaming mode).</td>
</tr>
<tr>
<td><h5>scan.split-enumerator.batch-size</h5></td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index c882ffb60..91dd1b8be 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -83,7 +83,16 @@ public class FlinkConnectorOptions {
.withDescription(
"Define a custom parallelism for the scan source. "
+ "By default, if this option is not
defined, the planner will derive the parallelism "
- + "for each statement individually by also
considering the global configuration.");
+ + "for each statement individually by also
considering the global configuration. "
+ + "If user enable the
scan.infer-parallelism, the planner will derive the parallelism by inferred
parallelism.");
+ public static final ConfigOption<Boolean> INFER_SCAN_PARALLELISM =
+ ConfigOptions.key("scan.infer-parallelism")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If it is false, parallelism of source are set by "
+ + SCAN_PARALLELISM.key()
+ + ". Otherwise, source parallelism is
inferred from splits number (batch mode) or bucket number(streaming mode).");
public static final ConfigOption<Boolean> STREAMING_READ_ATOMIC =
ConfigOptions.key("streaming-read-atomic")
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 9761c4cbe..f49b03309 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -25,6 +25,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.BucketsTable;
import org.apache.paimon.types.RowType;
@@ -99,6 +100,7 @@ public class CompactorSourceBuilder {
null,
TableScanUtils.compactStreamScanFactory());
} else {
+ List<Split> splits = readBuilder.newScan().plan().splits();
return new StaticFileStoreSource(
readBuilder,
null,
@@ -106,6 +108,7 @@ public class CompactorSourceBuilder {
.coreOptions()
.toConfiguration()
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
+ splits,
TableScanUtils.compactBatchScanFactory());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 6529c8f04..f0de0825a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.LogConsistency;
@@ -33,10 +34,15 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
+import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -48,6 +54,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.List;
import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
@@ -197,13 +204,41 @@ public class DataTableSource extends FlinkTableSource
.withProjection(projectFields)
.withPredicate(predicate)
.withLimit(limit)
- .withParallelism(
- Options.fromMap(table.options())
-
.get(FlinkConnectorOptions.SCAN_PARALLELISM))
.withWatermarkStrategy(watermarkStrategy);
return new PaimonDataStreamScanProvider(
- !streaming, env -> sourceBuilder.withEnv(env).build());
+ !streaming, env -> configureSource(sourceBuilder, env));
+ }
+
+ private DataStreamSource<RowData> configureSource(
+ FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
+ Options options = Options.fromMap(this.table.options());
+ Integer parallelism =
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
+ List<Split> splits = null;
+ if (options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
+ // for streaming mode, set the default parallelism to the bucket
number.
+ if (streaming) {
+ parallelism = options.get(CoreOptions.BUCKET);
+ } else {
+
+ Preconditions.checkState(table instanceof DataTable);
+ DataTable dataTable = (DataTable) table;
+ splits = dataTable.newScan().plan().splits();
+
+ if (null != splits) {
+ parallelism = splits.size();
+ }
+ if (null != limit && limit > 0) {
+ int limitCount =
+ limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE :
limit.intValue();
+ parallelism = Math.min(parallelism, limitCount);
+ }
+
+ parallelism = null == parallelism ? 1 : Math.max(1,
parallelism);
+ }
+ }
+
+ return
sourceBuilder.withParallelism(parallelism).withSplits(splits).withEnv(env).build();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
index 5f884cc18..f19ec828e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import java.util.List;
@@ -41,6 +42,12 @@ public class FileStoreSourceSplitGenerator {
.collect(Collectors.toList());
}
+ public List<FileStoreSourceSplit> createSplits(List<Split> splits) {
+ return splits.stream()
+ .map(s -> new FileStoreSourceSplit(getNextId(), s))
+ .collect(Collectors.toList());
+ }
+
protected final String getNextId() {
// because we just increment numbers, we increment the char
representation directly,
// rather than incrementing an integer and converting it to a string
representation
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 7de7e3c2c..17aa346d9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -27,6 +27,7 @@ import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
@@ -42,6 +43,7 @@ import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.Optional;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
@@ -64,6 +66,7 @@ public class FlinkSourceBuilder {
@Nullable private Integer parallelism;
@Nullable private Long limit;
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;
+ @Nullable private List<Split> splits;
public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) {
this.tableIdentifier = tableIdentifier;
@@ -112,12 +115,18 @@ public class FlinkSourceBuilder {
return this;
}
+ public FlinkSourceBuilder withSplits(List<Split> splits) {
+ this.splits = splits;
+ return this;
+ }
+
private StaticFileStoreSource buildStaticFileSource() {
return new StaticFileStoreSource(
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
limit,
Options.fromMap(table.options())
-
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
+
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
+ splits);
}
private ContinuousFileStoreSource buildContinuousFileSource() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 219f2c13a..c0fa8c9e0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -28,6 +29,7 @@ import
org.apache.flink.api.connector.source.SplitEnumeratorContext;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.List;
/** Bounded {@link FlinkSource} for reading records. It does not monitor new
snapshots. */
public class StaticFileStoreSource extends FlinkSource {
@@ -36,19 +38,25 @@ public class StaticFileStoreSource extends FlinkSource {
private final int splitBatchSize;
private final TableScanUtils.TableScanFactory scanFactory;
+ private final List<Split> splitList;
public StaticFileStoreSource(
- ReadBuilder readBuilder, @Nullable Long limit, int splitBatchSize)
{
- this(readBuilder, limit, splitBatchSize, ReadBuilder::newScan);
+ ReadBuilder readBuilder,
+ @Nullable Long limit,
+ int splitBatchSize,
+ List<Split> splitList) {
+ this(readBuilder, limit, splitBatchSize, splitList,
ReadBuilder::newScan);
}
public StaticFileStoreSource(
ReadBuilder readBuilder,
@Nullable Long limit,
int splitBatchSize,
+ List<Split> splitList,
TableScanUtils.TableScanFactory scanFactory) {
super(readBuilder, limit);
this.splitBatchSize = splitBatchSize;
+ this.splitList = splitList;
this.scanFactory = scanFactory;
}
@@ -62,11 +70,16 @@ public class StaticFileStoreSource extends FlinkSource {
SplitEnumeratorContext<FileStoreSourceSplit> context,
PendingSplitsCheckpoint checkpoint) {
Collection<FileStoreSourceSplit> splits =
- checkpoint == null
- ? new FileStoreSourceSplitGenerator()
-
.createSplits(scanFactory.create(readBuilder).plan())
- : checkpoint.splits();
-
+ checkpoint == null ? getSplits() : checkpoint.splits();
return new StaticFileStoreSplitEnumerator(context, null, splits,
splitBatchSize);
}
+
+ private List<FileStoreSourceSplit> getSplits() {
+ FileStoreSourceSplitGenerator splitGenerator = new
FileStoreSourceSplitGenerator();
+ if (null != splitList) {
+ return splitGenerator.createSplits(splitList);
+ } else {
+ return
splitGenerator.createSplits(scanFactory.create(readBuilder).plan());
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index a4d911540..bda8946b6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -24,6 +24,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.table.connector.ChangelogMode;
@@ -33,6 +34,8 @@ import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
+import java.util.List;
+
/** A {@link FlinkTableSource} for system table. */
public class SystemTableSource extends FlinkTableSource {
@@ -73,7 +76,8 @@ public class SystemTableSource extends FlinkTableSource {
if (isStreamingMode && table instanceof DataTable) {
source = new ContinuousFileStoreSource(readBuilder,
table.options(), limit);
} else {
- source = new StaticFileStoreSource(readBuilder, limit,
splitBatchSize);
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ source = new StaticFileStoreSource(readBuilder, limit,
splitBatchSize, splits);
}
return SourceProvider.of(source);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index cfe5f9f35..213625b6b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -57,7 +57,11 @@ import java.util.Map;
import java.util.UUID;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable;
@@ -1111,12 +1115,124 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
"",
new HashMap<String, String>() {
{
+
put(INFER_SCAN_PARALLELISM.key(), "false");
put(SCAN_PARALLELISM.key(),
"66");
}
})))
.isEqualTo(66);
}
+ @Test
+ public void testInferParallelism() throws Exception {
+ String table =
+ createTable(
+ Arrays.asList("currency STRING", "rate BIGINT"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(SOURCE_SPLIT_OPEN_FILE_COST.key(), "1KB");
+ put(SOURCE_SPLIT_TARGET_SIZE.key(), "1KB");
+ put(BUCKET.key(), "2");
+ }
+ });
+ // Empty table, infer parallelism should be at least 1
+ assertThat(
+ sourceParallelism(
+ buildQueryWithTableOptions(
+ table,
+ "*",
+ "",
+ new HashMap<String, String>() {
+ {
+
put(INFER_SCAN_PARALLELISM.key(), "true");
+ }
+ })))
+ .isEqualTo(1);
+
+ // error scan.parallelism, infer parallelism should be at least 1
+ assertThat(
+ sourceParallelism(
+ buildQueryWithTableOptions(
+ table,
+ "*",
+ "",
+ new HashMap<String, String>() {
+ {
+
put(INFER_SCAN_PARALLELISM.key(), "true");
+ put(SCAN_PARALLELISM.key(),
"-1");
+ }
+ })))
+ .isEqualTo(1);
+
+ // 2 splits, the parallelism is splits num: 2
+ insertInto(table, "('Euro', 119)");
+ insertInto(table, "('US Dollar', 102)");
+ assertThat(
+ sourceParallelism(
+ buildQueryWithTableOptions(
+ table,
+ "*",
+ "",
+ new HashMap<String, String>() {
+ {
+
put(INFER_SCAN_PARALLELISM.key(), "true");
+ }
+ })))
+ .isEqualTo(2);
+
+ // 2 splits and limit is 1, the parallelism is the limit value : 1
+ assertThat(
+ sourceParallelism(
+ buildQueryWithTableOptions(
+ table,
+ "*",
+ "",
+ 1L,
+ new HashMap<String, String>() {
+ {
+
put(INFER_SCAN_PARALLELISM.key(), "true");
+ }
+ })))
+ .isEqualTo(1);
+
+ // 2 splits, limit is 3, the parallelism is infer parallelism : 2
+ assertThat(
+ sourceParallelism(
+ buildQueryWithTableOptions(
+ table,
+ "*",
+ "",
+ 3L,
+ new HashMap<String, String>() {
+ {
+
put(INFER_SCAN_PARALLELISM.key(), "true");
+ }
+ })))
+ .isEqualTo(1);
+
+ // 2 splits, infer parallelism is disabled, the parallelism is
scan.parallelism
+ assertThat(
+ sourceParallelism(
+ buildQueryWithTableOptions(
+ table,
+ "*",
+ "",
+ new HashMap<String, String>() {
+ {
+
put(INFER_SCAN_PARALLELISM.key(), "false");
+ put(SCAN_PARALLELISM.key(),
"3");
+ }
+ })))
+ .isEqualTo(3);
+
+ // for streaming mode
+ assertThat(
+ sourceParallelismStreaming(
+ buildQueryWithTableOptions(table, "*", "", new
HashMap<>())))
+ .isEqualTo(2);
+ }
+
@Test
public void testSinkParallelism() throws Exception {
testSinkParallelism(null, bExeEnv.getParallelism());
@@ -1206,6 +1322,12 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
return stream.getParallelism();
}
+ private int sourceParallelismStreaming(String sql) {
+ DataStream<Row> stream =
+ ((StreamTableEnvironment)
sEnv).toChangelogStream(sEnv.sqlQuery(sql));
+ return stream.getParallelism();
+ }
+
private void testSinkParallelism(Integer configParallelism, int
expectedParallelism)
throws Exception {
// 1. create a mock table sink
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 8eba48b1e..3ffba193f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -235,11 +235,29 @@ public class ReadWriteTableTestUtil {
return buildQueryWithTableOptions(table, projection, filter, new
HashMap<>());
}
+ public static String buildQueryWithTableOptions(
+ String table,
+ String projection,
+ String filter,
+ Long limit,
+ Map<String, String> options) {
+ List<Object> params = new ArrayList<>();
+ params.add(projection);
+ params.add(table);
+ params.add(buildTableOptionsSpec(options));
+ params.add(filter);
+ StringBuilder queryFormat = new StringBuilder("SELECT %s FROM `%s` %s
%s");
+ if (null != limit) {
+ queryFormat.append(" limit %s");
+ params.add(limit);
+ }
+
+ return String.format(queryFormat.toString(), params.toArray());
+ }
+
public static String buildQueryWithTableOptions(
String table, String projection, String filter, Map<String,
String> options) {
- String queryFormat = "SELECT %s FROM `%s` %s %s;";
- return String.format(
- queryFormat, projection, table,
buildTableOptionsSpec(options), filter);
+ return buildQueryWithTableOptions(table, projection, filter, null,
options);
}
public static void checkFileStorePath(String table, List<String>
partitionSpec) {