This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 89a183f22007ad1b9afa9f7d9f2124d0e79fbae8 Author: JingsongLi <[email protected]> AuthorDate: Thu Jan 13 17:43:20 2022 +0800 [FLINK-25630] Introduce MergeTree writer and reader This closes #6 --- .../table/store/file/mergetree/MergeTree.java | 112 +++++++ .../store/file/mergetree/MergeTreeReader.java | 122 ++++++++ .../store/file/mergetree/MergeTreeWriter.java | 185 ++++++++++++ .../table/store/file/mergetree/MergeTreeTest.java | 323 +++++++++++++++++++++ 4 files changed, 742 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java new file mode 100644 index 0000000..84142d4 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTree.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.mergetree.compact.Accumulator; +import org.apache.flink.table.store.file.mergetree.compact.CompactManager; +import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy; +import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction; +import org.apache.flink.table.store.file.mergetree.sst.SstFile; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.file.utils.RecordWriter; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutorService; + +/** A merge tree, provides writer and reader, the granularity of the change is the file. */ +public class MergeTree { + + private final MergeTreeOptions options; + + private final SstFile sstFile; + + private final Comparator<RowData> keyComparator; + + private final ExecutorService compactExecutor; + + private final Accumulator accumulator; + + public MergeTree( + MergeTreeOptions options, + SstFile sstFile, + Comparator<RowData> keyComparator, + ExecutorService compactExecutor, + Accumulator accumulator) { + this.options = options; + this.sstFile = sstFile; + this.keyComparator = keyComparator; + this.compactExecutor = compactExecutor; + this.accumulator = accumulator; + } + + /** + * Create {@link RecordWriter} from restored files. Some compaction of files may occur during + * the write process. + */ + public RecordWriter createWriter(List<SstFileMeta> restoreFiles) { + long maxSequenceNumber = + restoreFiles.stream() + .map(SstFileMeta::maxSequenceNumber) + .max(Long::compare) + .orElse(-1L); + return new MergeTreeWriter( + new SortBufferMemTable( + sstFile.keyType(), + sstFile.valueType(), + options.writeBufferSize, + options.pageSize), + createCompactManager(), + new Levels(keyComparator, restoreFiles, options.numLevels), + maxSequenceNumber, + keyComparator, + accumulator.copy(), + sstFile, + options.commitForceCompact); + } + + /** + * Create {@link RecordReader} from file sections. The caller can decide whether to drop the + * deletion record. + */ + public RecordReader createReader(List<List<SortedRun>> sections, boolean dropDelete) + throws IOException { + return new MergeTreeReader( + sections, dropDelete, sstFile, keyComparator, accumulator.copy()); + } + + private CompactManager createCompactManager() { + CompactStrategy compactStrategy = + new UniversalCompaction( + options.maxSizeAmplificationPercent, + options.sizeRatio, + options.numSortedRunMax); + CompactManager.Rewriter rewriter = + (outputLevel, dropDelete, sections) -> + sstFile.write( + new RecordReaderIterator(createReader(sections, dropDelete)), + outputLevel); + return new CompactManager( + compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java new file mode 100644 index 0000000..30edafb --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.mergetree.compact.Accumulator; +import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader; +import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier; +import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader; +import org.apache.flink.table.store.file.mergetree.sst.SstFile; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.table.store.file.utils.RecordReader; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** A {@link RecordReader} to read merge tree sections. */ +public class MergeTreeReader implements RecordReader { + + private final RecordReader reader; + + private final boolean dropDelete; + + public MergeTreeReader( + List<List<SortedRun>> sections, + boolean dropDelete, + SstFile sstFile, + Comparator<RowData> userKeyComparator, + Accumulator accumulator) + throws IOException { + this.dropDelete = dropDelete; + + List<ReaderSupplier> readers = new ArrayList<>(); + for (List<SortedRun> section : sections) { + readers.add(() -> readerForSection(section, sstFile, userKeyComparator, accumulator)); + } + this.reader = ConcatRecordReader.create(readers); + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + RecordIterator batch = reader.readBatch(); + + if (!dropDelete) { + return batch; + } + + if (batch == null) { + return null; + } + + return new RecordIterator() { + @Override + public KeyValue next() throws IOException { + while (true) { + KeyValue kv = batch.next(); + if (kv == null) { + return null; + } + + if (kv.valueKind() == ValueKind.ADD) { + return kv; + } + } + } + + @Override + public void releaseBatch() { + batch.releaseBatch(); + } + }; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + public static RecordReader readerForSection( + List<SortedRun> section, + SstFile sstFile, + Comparator<RowData> userKeyComparator, + Accumulator accumulator) + throws IOException { + List<RecordReader> readers = new ArrayList<>(); + for (SortedRun run : section) { + readers.add(readerForRun(run, sstFile)); + } + return SortMergeReader.create(readers, userKeyComparator, accumulator); + } + + public static RecordReader readerForRun(SortedRun run, SstFile sstFile) throws IOException { + List<ReaderSupplier> readers = new ArrayList<>(); + for (SstFileMeta file : run.files()) { + readers.add(() -> sstFile.read(file.fileName())); + } + return ConcatRecordReader.create(readers); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java new file mode 100644 index 0000000..5cb4a3d --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.mergetree.compact.Accumulator; +import org.apache.flink.table.store.file.mergetree.compact.CompactManager; +import org.apache.flink.table.store.file.mergetree.sst.SstFile; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.util.CloseableIterator; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +/** A {@link RecordWriter} to write records and generate {@link Increment}. */ +public class MergeTreeWriter implements RecordWriter { + + private final MemTable memTable; + + private final CompactManager compactManager; + + private final Levels levels; + + private final Comparator<RowData> keyComparator; + + private final Accumulator accumulator; + + private final SstFile sstFile; + + private final boolean commitForceCompact; + + private final LinkedHashSet<SstFileMeta> newFiles; + + private final LinkedHashSet<SstFileMeta> compactBefore; + + private final LinkedHashSet<SstFileMeta> compactAfter; + + private long newSequenceNumber; + + public MergeTreeWriter( + MemTable memTable, + CompactManager compactManager, + Levels levels, + long maxSequenceNumber, + Comparator<RowData> keyComparator, + Accumulator accumulator, + SstFile sstFile, + boolean commitForceCompact) { + this.memTable = memTable; + this.compactManager = compactManager; + this.levels = levels; + this.newSequenceNumber = maxSequenceNumber + 1; + this.keyComparator = keyComparator; + this.accumulator = accumulator; + this.sstFile = sstFile; + this.commitForceCompact = commitForceCompact; + this.newFiles = new LinkedHashSet<>(); + this.compactBefore = new LinkedHashSet<>(); + this.compactAfter = new LinkedHashSet<>(); + } + + private long newSequenceNumber() { + return newSequenceNumber++; + } + + @VisibleForTesting + Levels levels() { + return levels; + } + + @Override + public void write(ValueKind valueKind, RowData key, RowData value) throws Exception { + long sequenceNumber = newSequenceNumber(); + boolean success = memTable.put(sequenceNumber, valueKind, key, value); + if (!success) { + flush(); + success = memTable.put(sequenceNumber, valueKind, key, value); + if (!success) { + throw new RuntimeException("Mem table is too small to hold a single element."); + } + } + } + + private void flush() throws Exception { + if (memTable.size() > 0) { + finishCompaction(); + Iterator<KeyValue> iterator = memTable.iterator(keyComparator, accumulator); + List<SstFileMeta> files = + sstFile.write(CloseableIterator.adapterForIterator(iterator), 0); + newFiles.addAll(files); + files.forEach(levels::addLevel0File); + memTable.clear(); + submitCompaction(); + } + } + + @Override + public Increment prepareCommit() throws Exception { + flush(); + if (commitForceCompact) { + finishCompaction(); + } + return drainIncrement(); + } + + @Override + public void sync() throws Exception { + finishCompaction(); + } + + private Increment drainIncrement() { + Increment increment = + new Increment( + new ArrayList<>(newFiles), + new ArrayList<>(compactBefore), + new ArrayList<>(compactAfter)); + newFiles.clear(); + compactBefore.clear(); + compactAfter.clear(); + return increment; + } + + private void updateCompactResult(CompactManager.CompactResult result) { + for (SstFileMeta file : result.before()) { + boolean removed = compactAfter.remove(file); + if (removed) { + // This is an intermediate file (not a new data file), which is no longer needed + // after compaction and can be deleted directly + sstFile.delete(file); + } else { + compactBefore.add(file); + } + } + compactAfter.addAll(result.after()); + } + + private void submitCompaction() { + compactManager.submitCompaction(levels); + } + + private void finishCompaction() throws ExecutionException, InterruptedException { + Optional<CompactManager.CompactResult> result = compactManager.finishCompaction(levels); + if (result.isPresent()) { + updateCompactResult(result.get()); + } + } + + @Override + public List<SstFileMeta> close() { + // delete temporary files + List<SstFileMeta> delete = new ArrayList<>(newFiles); + delete.addAll(compactAfter); + for (SstFileMeta file : delete) { + sstFile.delete(file); + } + newFiles.clear(); + compactAfter.clear(); + return delete; + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java new file mode 100644 index 0000000..5c00ccc --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator; +import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition; +import org.apache.flink.table.store.file.mergetree.sst.SstFile; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.table.store.file.mergetree.sst.SstFileTest; +import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link MergeTree}. */ +public class MergeTreeTest { + + @TempDir java.nio.file.Path tempDir; + + private static ExecutorService service; + + private SstPathFactory fileFactory; + + private Comparator<RowData> comparator; + + private SstFile sstFile; + + private MergeTree mergeTree; + + private RecordWriter writer; + + @BeforeEach + public void beforeEach() throws IOException { + fileFactory = new SstPathFactory(new Path(tempDir.toString()), null, 123); + Path bucketDir = fileFactory.toPath("ignore").getParent(); + bucketDir.getFileSystem().mkdirs(bucketDir); + + comparator = Comparator.comparingInt(o -> o.getInt(0)); + Configuration configuration = new Configuration(); + configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3)); + configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096)); + MergeTreeOptions options = new MergeTreeOptions(configuration); + sstFile = + new SstFile( + new RowType(singletonList(new RowType.RowField("k", new IntType()))), + new RowType(singletonList(new RowType.RowField("v", new IntType()))), + new SstFileTest.FlushingAvroFormat(), + fileFactory, + options.targetFileSize); + mergeTree = + new MergeTree(options, sstFile, comparator, service, new DeduplicateAccumulator()); + writer = mergeTree.createWriter(new ArrayList<>()); + } + + @BeforeAll + public static void before() { + service = Executors.newSingleThreadExecutor(); + } + + @AfterAll + public static void after() { + service.shutdownNow(); + service = null; + } + + @Test + public void testEmpty() throws Exception { + doTestWriteRead(0); + } + + @Test + public void test1() throws Exception { + doTestWriteRead(1); + } + + @Test + public void test2() throws Exception { + doTestWriteRead(new Random().nextInt(2)); + } + + @Test + public void test8() throws Exception { + doTestWriteRead(new Random().nextInt(8)); + } + + @Test + public void testRandom() throws Exception { + doTestWriteRead(new Random().nextInt(20)); + } + + @Test + public void testRestore() throws Exception { + List<TestRecord> expected = new ArrayList<>(writeBatch()); + List<SstFileMeta> newFiles = writer.prepareCommit().newFiles(); + writer = mergeTree.createWriter(newFiles); + expected.addAll(writeBatch()); + writer.prepareCommit(); + writer.sync(); + assertRecords(expected); + } + + @Test + public void testClose() throws Exception { + doTestWriteRead(6); + List<SstFileMeta> files = writer.close(); + for (SstFileMeta file : files) { + Path path = fileFactory.toPath(file.fileName()); + assertThat(path.getFileSystem().exists(path)).isFalse(); + } + } + + @Test + public void testWriteMany() throws Exception { + doTestWriteRead(3, 20_000); + } + + private void doTestWriteRead(int batchNumber) throws Exception { + doTestWriteRead(batchNumber, 200); + } + + private void doTestWriteRead(int batchNumber, int perBatch) throws Exception { + List<TestRecord> expected = new ArrayList<>(); + List<SstFileMeta> newFiles = new ArrayList<>(); + Set<String> newFileNames = new HashSet<>(); + List<SstFileMeta> compactedFiles = new ArrayList<>(); + + // write batch and commit + for (int i = 0; i <= batchNumber; i++) { + if (i < batchNumber) { + expected.addAll(writeBatch(perBatch)); + } else { + writer.sync(); + } + + Increment increment = writer.prepareCommit(); + newFiles.addAll(increment.newFiles()); + increment.newFiles().stream().map(SstFileMeta::fileName).forEach(newFileNames::add); + + // merge compacted + compactedFiles.addAll(increment.newFiles()); + for (SstFileMeta file : increment.compactBefore()) { + boolean remove = compactedFiles.remove(file); + assertThat(remove).isTrue(); + if (!newFileNames.contains(file.fileName())) { + sstFile.delete(file); + } + } + compactedFiles.addAll(increment.compactAfter()); + } + + // assert records from writer + assertRecords(expected); + + // assert records from increment new files + assertRecords(expected, newFiles, false); + assertRecords(expected, newFiles, true); + + // assert records from increment compacted files + assertRecords(expected, compactedFiles, true); + + Path bucketDir = fileFactory.toPath("ignore").getParent(); + Set<String> files = + Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir)) + .map(FileStatus::getPath) + .map(Path::getName) + .collect(Collectors.toSet()); + newFiles.stream().map(SstFileMeta::fileName).forEach(files::remove); + compactedFiles.stream().map(SstFileMeta::fileName).forEach(files::remove); + assertThat(files).isEqualTo(Collections.emptySet()); + } + + private List<TestRecord> writeBatch() throws Exception { + return writeBatch(200); + } + + private List<TestRecord> writeBatch(int perBatch) throws Exception { + List<TestRecord> records = generateRandom(perBatch); + writeAll(records); + return records; + } + + private void assertRecords(List<TestRecord> expected) throws Exception { + // compaction will drop delete + List<SstFileMeta> files = ((MergeTreeWriter) writer).levels().allFiles(); + assertRecords(expected, files, true); + } + + private void assertRecords( + List<TestRecord> expected, List<SstFileMeta> files, boolean dropDelete) + throws Exception { + assertThat(readAll(files, dropDelete)).isEqualTo(compactAndSort(expected, dropDelete)); + } + + private List<TestRecord> compactAndSort(List<TestRecord> records, boolean dropDelete) { + TreeMap<Integer, TestRecord> map = new TreeMap<>(); + for (TestRecord record : records) { + map.put(record.k, record); + } + if (dropDelete) { + return map.values().stream() + .filter(record -> record.kind == ValueKind.ADD) + .collect(Collectors.toList()); + } + return new ArrayList<>(map.values()); + } + + private void writeAll(List<TestRecord> records) throws Exception { + for (TestRecord record : records) { + writer.write(record.kind, row(record.k), row(record.v)); + } + } + + private List<TestRecord> readAll(List<SstFileMeta> files, boolean dropDelete) throws Exception { + RecordReader reader = + mergeTree.createReader( + new IntervalPartition(files, comparator).partition(), dropDelete); + List<TestRecord> records = new ArrayList<>(); + try (RecordReaderIterator iterator = new RecordReaderIterator(reader)) { + while (iterator.hasNext()) { + KeyValue kv = iterator.next(); + records.add( + new TestRecord(kv.valueKind(), kv.key().getInt(0), kv.value().getInt(0))); + } + } + return records; + } + + private RowData row(int i) { + return GenericRowData.of(i); + } + + private List<TestRecord> generateRandom(int perBatch) { + Random random = new Random(); + List<TestRecord> records = new ArrayList<>(perBatch); + for (int i = 0; i < perBatch; i++) { + records.add( + new TestRecord( + random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE, + random.nextInt(perBatch / 2), + random.nextInt())); + } + return records; + } + + private static class TestRecord { + + private final ValueKind kind; + private final int k; + private final int v; + + private TestRecord(ValueKind kind, int k, int v) { + this.kind = kind; + this.k = k; + this.v = v; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestRecord that = (TestRecord) o; + return k == that.k && v == that.v && kind == that.kind; + } + + @Override + public String toString() { + return "TestRecord{" + "kind=" + kind + ", k=" + k + ", v=" + v + '}'; + } + } +}
