This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 78df6c187 [#1747] feat(remote merge): Introduce common merger to merge
multiple data streams. (#1925)
78df6c187 is described below
commit 78df6c187108c00e6b68f9af2504bd0b57c774da
Author: zhengchenyu <[email protected]>
AuthorDate: Mon Jul 22 16:01:49 2024 +0800
[#1747] feat(remote merge): Introduce common merger to merge multiple data
streams. (#1925)
### What changes were proposed in this pull request?
Introduce merger to merge multiple data streams according to key. Minimum
heap K-way merge sorting is used to merge and sort the data streams that have
been partially sorted.
### Why are the changes needed?
Fix: #1747
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test and test in cluster.
---
.../uniffle/common/merger/KeyValueIterator.java | 31 +++
.../apache/uniffle/common/merger/MergeState.java | 35 ++++
.../org/apache/uniffle/common/merger/Merger.java | 208 +++++++++++++++++++++
.../apache/uniffle/common/merger/Recordable.java | 30 +++
.../org/apache/uniffle/common/merger/Segment.java | 41 ++++
.../uniffle/common/merger/StreamedSegment.java | 122 ++++++++++++
.../apache/uniffle/common/merger/MergerTest.java | 113 +++++++++++
.../uniffle/common/serializer/SerializerUtils.java | 103 ++++++++++
8 files changed, 683 insertions(+)
diff --git
a/common/src/main/java/org/apache/uniffle/common/merger/KeyValueIterator.java
b/common/src/main/java/org/apache/uniffle/common/merger/KeyValueIterator.java
new file mode 100644
index 000000000..22344f803
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/merger/KeyValueIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.IOException;
+
+public interface KeyValueIterator<K, V> {
+
+ K getCurrentKey();
+
+ V getCurrentValue();
+
+ boolean next() throws IOException;
+
+ void close() throws IOException;
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/merger/MergeState.java
b/common/src/main/java/org/apache/uniffle/common/merger/MergeState.java
new file mode 100644
index 000000000..3439d0701
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/MergeState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+public enum MergeState {
+ DONE(0),
+ INITED(1),
+ MERGING(2),
+ INTERNAL_ERROR(3);
+
+ private final int code;
+
+ MergeState(int code) {
+ this.code = code;
+ }
+
+ public int code() {
+ return code;
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/merger/Merger.java
b/common/src/main/java/org/apache/uniffle/common/merger/Merger.java
new file mode 100644
index 000000000..7fcbe2d9b
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/Merger.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.PriorityQueue;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.records.RecordsWriter;
+
+public class Merger {
+
+ public static class MergeQueue<K, V> extends PriorityQueue<Segment>
implements KeyValueIterator {
+
+ private final RssConf rssConf;
+ private final List<Segment> segments;
+ private final Class<K> keyClass;
+ private final Class<V> valueClass;
+ private Comparator comparator;
+ private boolean raw;
+
+ private Object currentKey;
+ private Object currentValue;
+ private Segment minSegment;
+ private Function<Integer, Segment> popSegmentHook;
+
+ public MergeQueue(
+ RssConf rssConf,
+ List<Segment> segments,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ Comparator<K> comparator,
+ boolean raw) {
+ this.rssConf = rssConf;
+ this.segments = segments;
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ if (comparator == null) {
+ throw new RssException("comparator is null!");
+ }
+ this.raw = raw;
+ this.comparator = comparator;
+ }
+
+ public void setPopSegmentHook(Function<Integer, Segment> popSegmentHook) {
+ this.popSegmentHook = popSegmentHook;
+ }
+
+ @Override
+ protected boolean lessThan(Object o1, Object o2) {
+ if (raw) {
+ Segment s1 = (Segment) o1;
+ Segment s2 = (Segment) o2;
+ DataOutputBuffer key1 = (DataOutputBuffer) s1.getCurrentKey();
+ DataOutputBuffer key2 = (DataOutputBuffer) s2.getCurrentKey();
+ int c =
+ ((RawComparator) comparator)
+ .compare(key1.getData(), 0, key1.getLength(), key2.getData(),
0, key2.getLength());
+ return c < 0 || ((c == 0) && s1.getId() < s2.getId());
+ } else {
+ Segment s1 = (Segment) o1;
+ Segment s2 = (Segment) o2;
+ Object key1 = s1.getCurrentKey();
+ Object key2 = s2.getCurrentKey();
+ int c = comparator.compare(key1, key2);
+ return c < 0 || ((c == 0) && s1.getId() < s2.getId());
+ }
+ }
+
+ public void init() throws IOException {
+ List<Segment> segmentsToMerge = new ArrayList();
+ for (Segment segment : segments) {
+ boolean hasNext = segment.next();
+ if (hasNext) {
+ segmentsToMerge.add(segment);
+ } else {
+ segment.close();
+ }
+ }
+ initialize(segmentsToMerge.size());
+ clear();
+ for (Segment segment : segmentsToMerge) {
+ put(segment);
+ }
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return currentKey;
+ }
+
+ @Override
+ public Object getCurrentValue() {
+ return currentValue;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (size() == 0) {
+ resetKeyValue();
+ return false;
+ }
+
+ if (minSegment != null) {
+ adjustPriorityQueue(minSegment);
+ if (size() == 0) {
+ minSegment = null;
+ resetKeyValue();
+ return false;
+ }
+ }
+ minSegment = top();
+ currentKey = minSegment.getCurrentKey();
+ currentValue = minSegment.getCurrentValue();
+ return true;
+ }
+
+ private void resetKeyValue() {
+ currentKey = null;
+ currentValue = null;
+ }
+
+ private void adjustPriorityQueue(Segment segment) throws IOException {
+ if (segment.next()) {
+ adjustTop();
+ } else {
+ pop();
+ segment.close();
+ if (popSegmentHook != null) {
+ Segment newSegment = popSegmentHook.apply((int) segment.getId());
+ if (newSegment != null) {
+ if (newSegment.next()) {
+ put(newSegment);
+ } else {
+ newSegment.close();
+ }
+ }
+ }
+ }
+ }
+
+ void merge(OutputStream output) throws IOException {
+ RecordsWriter<K, V> writer =
+ new RecordsWriter<K, V>(rssConf, output, keyClass, valueClass, raw);
+ boolean recorded = true;
+ while (this.next()) {
+ writer.append(this.getCurrentKey(), this.getCurrentValue());
+ if (output instanceof Recordable) {
+ recorded =
+ ((Recordable) output)
+ .record(writer.getTotalBytesWritten(), () -> writer.flush(),
false);
+ }
+ }
+ writer.flush();
+ if (!recorded) {
+ ((Recordable) output).record(writer.getTotalBytesWritten(), null,
true);
+ }
+ writer.close();
+ }
+
+ @Override
+ public void close() throws IOException {
+ Segment segment;
+ while ((segment = pop()) != null) {
+ segment.close();
+ }
+ }
+ }
+
+ public static void merge(
+ RssConf conf,
+ OutputStream output,
+ List<Segment> segments,
+ Class keyClass,
+ Class valueClass,
+ Comparator comparator,
+ boolean raw)
+ throws IOException {
+ MergeQueue mergeQueue = new MergeQueue(conf, segments, keyClass,
valueClass, comparator, raw);
+ mergeQueue.init();
+ mergeQueue.merge(output);
+ mergeQueue.close();
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/merger/Recordable.java
b/common/src/main/java/org/apache/uniffle/common/merger/Recordable.java
new file mode 100644
index 000000000..79604b7f6
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/Recordable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.IOException;
+
+public interface Recordable {
+
+ @FunctionalInterface
+ interface Flushable {
+ void flush() throws IOException;
+ }
+
+ boolean record(long written, Flushable flush, boolean force) throws
IOException;
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/merger/Segment.java
b/common/src/main/java/org/apache/uniffle/common/merger/Segment.java
new file mode 100644
index 000000000..f8a730122
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/Segment.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.IOException;
+
+public abstract class Segment {
+
+ private long id;
+
+ public Segment(long id) {
+ this.id = id;
+ }
+
+ public abstract boolean next() throws IOException;
+
+ public abstract Object getCurrentKey();
+
+ public abstract Object getCurrentValue();
+
+ public long getId() {
+ return this.id;
+ }
+
+ public abstract void close() throws IOException;
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
b/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
new file mode 100644
index 000000000..1966a3e51
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.File;
+import java.io.IOException;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.records.RecordsReader;
+import org.apache.uniffle.common.serializer.PartialInputStream;
+import org.apache.uniffle.common.serializer.PartialInputStreamImpl;
+
+public class StreamedSegment<K, V> extends Segment {
+
+ private RecordsReader<K, V> reader;
+ ByteBuf byteBuf = null;
+
+ public StreamedSegment(
+ RssConf rssConf,
+ PartialInputStream inputStream,
+ long blockId,
+ Class keyClass,
+ Class valueClass,
+ boolean raw) {
+ super(blockId);
+ this.reader = new RecordsReader<>(rssConf, inputStream, keyClass,
valueClass, raw);
+ }
+
+ public StreamedSegment(
+ RssConf rssConf, ByteBuf byteBuf, long blockId, Class keyClass, Class
valueClass, boolean raw)
+ throws IOException {
+ super(blockId);
+ this.byteBuf = byteBuf;
+ this.byteBuf.retain();
+ byte[] buffer = byteBuf.array();
+ this.reader =
+ new RecordsReader<>(
+ rssConf,
+ PartialInputStreamImpl.newInputStream(buffer, 0, buffer.length),
+ keyClass,
+ valueClass,
+ raw);
+ }
+
+ // The buffer must be sorted by key
+ public StreamedSegment(
+ RssConf rssConf, byte[] buffer, long blockId, Class keyClass, Class
valueClass, boolean raw)
+ throws IOException {
+ super(blockId);
+ this.reader =
+ new RecordsReader<>(
+ rssConf,
+ PartialInputStreamImpl.newInputStream(buffer, 0, buffer.length),
+ keyClass,
+ valueClass,
+ raw);
+ }
+
+ public StreamedSegment(
+ RssConf rssConf,
+ File file,
+ long start,
+ long end,
+ long blockId,
+ Class keyClass,
+ Class valueClass,
+ boolean raw)
+ throws IOException {
+ super(blockId);
+ this.reader =
+ new RecordsReader<K, V>(
+ rssConf,
+ PartialInputStreamImpl.newInputStream(file, start, end),
+ keyClass,
+ valueClass,
+ raw);
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return this.reader.next();
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return this.reader.getCurrentKey();
+ }
+
+ @Override
+ public Object getCurrentValue() {
+ return this.reader.getCurrentValue();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (byteBuf != null) {
+ this.byteBuf.release();
+ this.byteBuf = null;
+ }
+ if (this.reader != null) {
+ this.reader.close();
+ this.reader = null;
+ }
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
b/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
new file mode 100644
index 000000000..1757ad004
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.RawComparator;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.records.RecordsReader;
+import org.apache.uniffle.common.serializer.PartialInputStreamImpl;
+import org.apache.uniffle.common.serializer.SerializerUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MergerTest {
+
+ private static final int RECORDS = 1009;
+ private static final int SEGMENTS = 4;
+
+ @ParameterizedTest
+ @ValueSource(
+ strings = {
+ "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable",
+ })
+ public void testMergeSegmentToFile(String classes, @TempDir File tmpDir)
throws Exception {
+ // 1 Parse arguments
+ String[] classArray = classes.split(",");
+ Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+ Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+
+ // 2 Construct segments, then merge
+ RssConf rssConf = new RssConf();
+ List<Segment> segments = new ArrayList<>();
+ Comparator comparator = SerializerUtils.getComparator(keyClass);
+ for (int i = 0; i < SEGMENTS; i++) {
+ if (i % 2 == 0) {
+ segments.add(
+ SerializerUtils.genMemorySegment(
+ rssConf,
+ keyClass,
+ valueClass,
+ i,
+ i,
+ SEGMENTS,
+ RECORDS,
+ comparator instanceof RawComparator));
+ } else {
+ segments.add(
+ SerializerUtils.genFileSegment(
+ rssConf,
+ keyClass,
+ valueClass,
+ i,
+ i,
+ SEGMENTS,
+ RECORDS,
+ tmpDir,
+ comparator instanceof RawComparator));
+ }
+ }
+ File mergedFile = new File(tmpDir, "data.merged");
+ FileOutputStream outputStream = new FileOutputStream(mergedFile);
+ Merger.merge(
+ rssConf,
+ outputStream,
+ segments,
+ keyClass,
+ valueClass,
+ comparator,
+ comparator instanceof RawComparator);
+ outputStream.close();
+
+ // 3 Check the merged file
+ RecordsReader reader =
+ new RecordsReader(
+ rssConf,
+ PartialInputStreamImpl.newInputStream(mergedFile, 0,
mergedFile.length()),
+ keyClass,
+ valueClass,
+ false);
+ int index = 0;
+ while (reader.next()) {
+ assertEquals(SerializerUtils.genData(keyClass, index),
reader.getCurrentKey());
+ assertEquals(SerializerUtils.genData(valueClass, index),
reader.getCurrentValue());
+ index++;
+ }
+ assertEquals(RECORDS * SEGMENTS, index);
+ reader.close();
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
index b57114fb1..d5675182b 100644
---
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
+++
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
@@ -17,12 +17,22 @@
package org.apache.uniffle.common.serializer;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Comparator;
import com.google.common.base.Objects;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.merger.Segment;
+import org.apache.uniffle.common.merger.StreamedSegment;
+import org.apache.uniffle.common.records.RecordsWriter;
+
public class SerializerUtils {
public static class SomeClass {
@@ -125,4 +135,97 @@ public class SerializerUtils {
}
return null;
}
+
+ public static byte[] genSortedRecordBytes(
+ RssConf rssConf,
+ Class keyClass,
+ Class valueClass,
+ int start,
+ int interval,
+ int length,
+ int replica)
+ throws IOException {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ genSortedRecord(rssConf, keyClass, valueClass, start, interval, length,
output, replica);
+ return output.toByteArray();
+ }
+
+ public static Segment genMemorySegment(
+ RssConf rssConf,
+ Class keyClass,
+ Class valueClass,
+ long blockId,
+ int start,
+ int interval,
+ int length)
+ throws IOException {
+ return genMemorySegment(rssConf, keyClass, valueClass, blockId, start,
interval, length, false);
+ }
+
+ public static Segment genMemorySegment(
+ RssConf rssConf,
+ Class keyClass,
+ Class valueClass,
+ long blockId,
+ int start,
+ int interval,
+ int length,
+ boolean raw)
+ throws IOException {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ genSortedRecord(rssConf, keyClass, valueClass, start, interval, length,
output, 1);
+ return new StreamedSegment(rssConf, output.toByteArray(), blockId,
keyClass, valueClass, raw);
+ }
+
+ public static Segment genFileSegment(
+ RssConf rssConf,
+ Class keyClass,
+ Class valueClass,
+ long blockId,
+ int start,
+ int interval,
+ int length,
+ File tmpDir)
+ throws IOException {
+ return genFileSegment(
+ rssConf, keyClass, valueClass, blockId, start, interval, length,
tmpDir, false);
+ }
+
+ public static Segment genFileSegment(
+ RssConf rssConf,
+ Class keyClass,
+ Class valueClass,
+ long blockId,
+ int start,
+ int interval,
+ int length,
+ File tmpDir,
+ boolean raw)
+ throws IOException {
+ File file = new File(tmpDir, "data." + start);
+ genSortedRecord(
+ rssConf, keyClass, valueClass, start, interval, length, new
FileOutputStream(file), 1);
+ return new StreamedSegment(rssConf, file, 0, file.length(), blockId,
keyClass, valueClass, raw);
+ }
+
+ private static void genSortedRecord(
+ RssConf rssConf,
+ Class keyClass,
+ Class valueClass,
+ int start,
+ int interval,
+ int length,
+ OutputStream output,
+ int replica)
+ throws IOException {
+ RecordsWriter writer = new RecordsWriter(rssConf, output, keyClass,
valueClass, false);
+ for (int i = 0; i < length; i++) {
+ for (int j = 0; j < replica; j++) {
+ writer.append(
+ SerializerUtils.genData(keyClass, start + i * interval),
+ SerializerUtils.genData(valueClass, start + i * interval));
+ }
+ }
+ writer.close();
+ }
}