This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ff64c8ca4 [hotfix] Improve spark procedures document format and
duplicate code calls (#3716)
ff64c8ca4 is described below
commit ff64c8ca46c1e8593857eb15ef07c93045959b30
Author: Kerwin <[email protected]>
AuthorDate: Thu Jul 11 09:53:27 2024 +0800
[hotfix] Improve spark procedures document format and duplicate code calls
(#3716)
---
docs/content/spark/procedures.md | 16 +++++-----
.../java/org/apache/paimon/stats/Statistics.java | 34 ++++++++++------------
.../org/apache/paimon/stats/StatsFileHandler.java | 2 +-
.../apache/paimon/utils/FileStorePathFactory.java | 10 +++----
.../apache/paimon/utils/ScanParallelExecutor.java | 2 +-
5 files changed, 30 insertions(+), 34 deletions(-)
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 43b82cb6c..d464881d6 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -49,8 +49,8 @@ This section introduce all available spark procedures about
paimon.
<li>max_concurrent_jobs: when sort compact is used, files in one
partition are grouped and submitted as a single spark compact job. This
parameter controls the maximum number of jobs that can be submitted
simultaneously. The default value is 15.</li>
</td>
<td>
- SET spark.sql.shuffle.partitions=10; --set the compact parallelism
<br/>
- CALL sys.compact(table => 'T', partitions => 'p=0;p=1',
order_strategy => 'zorder', order_by => 'a,b') <br/>
+ SET spark.sql.shuffle.partitions=10; --set the compact parallelism
<br/><br/>
+ CALL sys.compact(table => 'T', partitions => 'p=0;p=1',
order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy
=> 'zorder', order_by => 'a,b')
</td>
</tr>
@@ -87,7 +87,7 @@ This section introduce all available spark procedures about
paimon.
</td>
<td>
-- based on snapshot 10 with 1d <br/>
- CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot
=> 10, time_retained => '1 d') <br/>
+ CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot
=> 10, time_retained => '1 d') <br/><br/>
-- based on the latest snapshot <br/>
CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
</td>
@@ -109,7 +109,7 @@ This section introduce all available spark procedures about
paimon.
<li>version: id of the snapshot or name of tag that will roll back
to.</li>
</td>
<td>
- CALL sys.rollback(table => 'default.T', version => 'my_tag')<br/>
+ CALL sys.rollback(table => 'default.T', version =>
'my_tag')<br/><br/>
CALL sys.rollback(table => 'default.T', version => 10)
</td>
</tr>
@@ -146,9 +146,7 @@ This section introduce all available spark procedures about
paimon.
<li>database_or_table: empty or the target database name or the
target table identifier, if you specify multiple tags, delimiter is ','</li>
</td>
<td>
- CALL sys.repair('test_db.T')
- </td>
- <td>
+ CALL sys.repair('test_db.T')<br/><br/>
CALL sys.repair('test_db.T,test_db01,test_db.T2')
</td>
</tr>
@@ -162,8 +160,8 @@ This section introduce all available spark procedures about
paimon.
<li>snapshot(Long): id of the snapshot which the new tag is based
on.</li>
</td>
<td>
- CALL sys.create_branch(table => 'test_db.T', branch =>
'test_branch')<br/>
- CALL sys.create_branch(table => 'test_db.T', branch =>
'test_branch', tag => 'my_tag')<br/>
+ CALL sys.create_branch(table => 'test_db.T', branch =>
'test_branch')<br/><br/>
+ CALL sys.create_branch(table => 'test_db.T', branch =>
'test_branch', tag => 'my_tag')<br/><br/>
CALL sys.create_branch(table => 'test_db.T', branch =>
'test_branch', snapshot => 10)
</td>
</tr>
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
b/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
index 5d9403044..32c3699c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.OptionalUtils;
@@ -38,6 +39,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
+import java.util.stream.Collectors;
/**
* Global stats, supports the following stats.
@@ -119,18 +121,16 @@ public class Statistics {
public void serializeFieldsToString(TableSchema schema) {
try {
if (colStats != null) {
+ Map<String, DataType> fields =
+ schema.fields().stream()
+ .collect(Collectors.toMap(DataField::name,
DataField::type));
for (Map.Entry<String, ColStats<?>> entry :
colStats.entrySet()) {
String colName = entry.getKey();
ColStats<?> colStats = entry.getValue();
- DataType type =
- schema.fields().stream()
- .filter(field ->
field.name().equals(colName))
- .findFirst()
- .orElseThrow(
- () ->
- new IllegalStateException(
- "Unable to obtain
the latest schema"))
- .type();
+ DataType type = fields.get(colName);
+ if (type == null) {
+ throw new IllegalStateException("Unable to obtain the
latest schema");
+ }
colStats.serializeFieldsToString(type);
}
}
@@ -142,18 +142,16 @@ public class Statistics {
public void deserializeFieldsFromString(TableSchema schema) {
try {
if (colStats != null) {
+ Map<String, DataType> fields =
+ schema.fields().stream()
+ .collect(Collectors.toMap(DataField::name,
DataField::type));
for (Map.Entry<String, ColStats<?>> entry :
colStats.entrySet()) {
String colName = entry.getKey();
ColStats<?> colStats = entry.getValue();
- DataType type =
- schema.fields().stream()
- .filter(field ->
field.name().equals(colName))
- .findFirst()
- .orElseThrow(
- () ->
- new IllegalStateException(
- "Unable to obtain
the latest schema"))
- .type();
+ DataType type = fields.get(colName);
+ if (type == null) {
+ throw new IllegalStateException("Unable to obtain the
latest schema");
+ }
colStats.deserializeFieldsFromString(type);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
index 0ca23fad5..f9e057c7c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
@@ -56,7 +56,7 @@ public class StatsFileHandler {
public Optional<Statistics> readStats() {
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
- throw new IllegalStateException("Unable to obtain the latest
schema");
+ throw new IllegalStateException("Unable to obtain the latest
snapshot");
}
return readStats(latestSnapshotId);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 7696f9ada..a49e6dbc4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -106,11 +106,11 @@ public class FileStorePathFactory {
public Path relativePartitionAndBucketPath(BinaryRow partition, int
bucket) {
String partitionPath = getPartitionString(partition);
- if (partitionPath.isEmpty()) {
- return new Path(BUCKET_PATH_PREFIX + bucket);
- } else {
- return new Path(getPartitionString(partition) + "/" +
BUCKET_PATH_PREFIX + bucket);
- }
+ String fullPath =
+ partitionPath.isEmpty()
+ ? BUCKET_PATH_PREFIX + bucket
+ : partitionPath + "/" + BUCKET_PATH_PREFIX + bucket;
+ return new Path(fullPath);
}
/** IMPORTANT: This method is NOT THREAD SAFE. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
index 969dbbf9d..6bf6b3517 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
@@ -45,7 +45,7 @@ public class ScanParallelExecutor {
if (queueSize == null) {
queueSize = COMMON_IO_FORK_JOIN_POOL.getParallelism();
} else if (queueSize <= 0) {
- throw new NegativeArraySizeException("queue size should not be
negetive");
+ throw new NegativeArraySizeException("queue size should not be
negative");
}
final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input,
queueSize));