This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6f72239dc [hotfix] Fix code style for zorder
6f72239dc is described below
commit 6f72239dce8d3c3d669b48c8109c37372820aa5d
Author: Jingsong <[email protected]>
AuthorDate: Wed Aug 23 21:40:09 2023 +0800
[hotfix] Fix code style for zorder
---
.../java/org/apache/paimon/sort/zorder/ZIndexer.java | 3 +--
.../java/org/apache/paimon/utils/ZOrderByteUtils.java | 7 ++++---
.../org/apache/paimon/flink/shuffle/RangeShuffle.java | 18 +++++++-----------
.../apache/paimon/flink/sorter/ZorderSorterUtils.java | 5 ++---
.../flink/source/assigners/PreAssignSplitAssigner.java | 2 +-
5 files changed, 15 insertions(+), 20 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
index de281aae3..d1f18a193 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
@@ -105,8 +105,7 @@ public class ZIndexer implements Serializable {
public Set<RowProcessor> constructFunctionMap(List<DataField> fields) {
Set<RowProcessor> zorderFunctionSet = new LinkedHashSet<>();
// Construct zorderFunctionSet and fill dataTypes, rowFields
- for (int fieldIndex = 0; fieldIndex < fieldsIndex.length;
fieldIndex++) {
- int index = fieldsIndex[fieldIndex];
+ for (int index : fieldsIndex) {
DataField field = fields.get(index);
zorderFunctionSet.add(zmapColumnToCalculator(field, index));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
index 1ea4194c2..220aaa004 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
@@ -45,7 +45,8 @@ public class ZOrderByteUtils {
public static final int PRIMITIVE_BUFFER_SIZE = 8;
public static final byte[] NULL_BYTES = new byte[PRIMITIVE_BUFFER_SIZE];
- private static ThreadLocal<CharsetEncoder> encoderThreadLocal = new
ThreadLocal<>();
+
+ private static final ThreadLocal<CharsetEncoder> ENCODER = new
ThreadLocal<>();
static {
Arrays.fill(NULL_BYTES, (byte) 0x00);
@@ -133,10 +134,10 @@ public class ZOrderByteUtils {
*/
@SuppressWarnings("ByteBufferBackingArray")
public static ByteBuffer stringToOrderedBytes(String val, int length,
ByteBuffer reuse) {
- CharsetEncoder encoder = encoderThreadLocal.get();
+ CharsetEncoder encoder = ENCODER.get();
if (encoder == null) {
encoder = StandardCharsets.UTF_8.newEncoder();
- encoderThreadLocal.set(encoder);
+ ENCODER.set(encoder);
}
ByteBuffer bytes = reuse(reuse, length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index d07615e93..7318a883d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -200,8 +200,7 @@ public class RangeShuffle {
*
* <p>See {@link Sampler}.
*/
- @Internal
- public static class GlobalSampleOperator<T> extends
TableStreamOperator<List<T>>
+ private static class GlobalSampleOperator<T> extends
TableStreamOperator<List<T>>
implements OneInputStreamOperator<Tuple2<Double, T>, List<T>>,
BoundedOneInput {
private static final long serialVersionUID = 1L;
@@ -222,7 +221,6 @@ public class RangeShuffle {
@Override
public void open() throws Exception {
super.open();
- //noinspection unchecked
this.sampler = new Sampler<>(numSample, 0L);
this.collector = new StreamRecordCollector<>(output);
}
@@ -245,6 +243,7 @@ public class RangeShuffle {
sampledData.sort(keyComparator);
int boundarySize = rangesNum - 1;
+ @SuppressWarnings("unchecked")
T[] boundaries = (T[]) new Object[boundarySize];
if (sampledData.size() > 0) {
double avgRange = sampledData.size() / (double) rangesNum;
@@ -262,8 +261,7 @@ public class RangeShuffle {
* This two-input-operator require a input with RangeBoundaries as
broadcast input, and generate
* Tuple2 which includes range index and record from the other input
itself as output.
*/
- @Internal
- public static class AssignRangeIndexOperator<T>
+ private static class AssignRangeIndexOperator<T>
extends TableStreamOperator<Tuple2<Integer, Pair<T, RowData>>>
implements TwoInputStreamOperator<
List<T>, Pair<T, RowData>, Tuple2<Integer, Pair<T,
RowData>>>,
@@ -287,12 +285,12 @@ public class RangeShuffle {
}
@Override
- public void processElement1(StreamRecord<List<T>> streamRecord) throws
Exception {
+ public void processElement1(StreamRecord<List<T>> streamRecord) {
this.boundaries = streamRecord.getValue();
}
@Override
- public void processElement2(StreamRecord<Pair<T, RowData>>
streamRecord) throws Exception {
+ public void processElement2(StreamRecord<Pair<T, RowData>>
streamRecord) {
if (boundaries == null) {
throw new RuntimeException("There should be one data from the
first input.");
}
@@ -361,8 +359,7 @@ public class RangeShuffle {
}
/** Remove the range index and return the actual record. */
- @Internal
- public static class RemoveRangeIndexOperator<T> extends
TableStreamOperator<Pair<T, RowData>>
+ private static class RemoveRangeIndexOperator<T> extends
TableStreamOperator<Pair<T, RowData>>
implements OneInputStreamOperator<Tuple2<Integer, Pair<T,
RowData>>, Pair<T, RowData>> {
private static final long serialVersionUID = 1L;
@@ -395,8 +392,7 @@ public class RangeShuffle {
* href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">"Optimal Random
Sampling from
* Distributed Streams Revisited"</a>.
*/
- @Internal
- public static class Sampler<T> {
+ private static class Sampler<T> {
private final int numSamples;
private final Random random;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
index 7e6c78192..dd94255a8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
@@ -67,7 +67,6 @@ public class ZorderSorterUtils {
* @param inputStream the stream wait to be ordered
* @param zIndexer generate z-index by the given row
* @param table the FileStoreTable
- * @return
*/
public static DataStream<RowData> sortStreamByZorder(
DataStream<RowData> inputStream, ZIndexer zIndexer, FileStoreTable
table) {
@@ -137,7 +136,7 @@ public class ZorderSorterUtils {
private transient KeyProjectedRow keyProjectedRow;
@Override
- public void open(Configuration parameters) throws
Exception {
+ public void open(Configuration parameters) {
int[] map = new int[fieldCount];
for (int i = 0; i < map.length; i++) {
map[i] = i + 1;
@@ -146,7 +145,7 @@ public class ZorderSorterUtils {
}
@Override
- public InternalRow map(InternalRow value) throws
Exception {
+ public InternalRow map(InternalRow value) {
return keyProjectedRow.replaceRow(value);
}
})
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
index 2fb4ee2e4..63e24026f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
@@ -35,7 +35,7 @@ import java.util.Map;
import java.util.Queue;
/**
- * Pre-calculate which splits each task should zvalue according to the weight,
and then distribute
+ * Pre-calculate which splits each task should process according to the
weight, and then distribute
* the splits fairly.
*/
public class PreAssignSplitAssigner implements SplitAssigner {