This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit ac9d902bef8959742891ff63d435aa498507f084
Author: JingsongLi <[email protected]>
AuthorDate: Thu Jan 13 17:05:57 2022 +0800

    [FLINK-25629] Introduce CompactManager
    
    This closes #5
---
 .../flink/table/store/file/mergetree/Levels.java   | 138 ++++++++++
 .../file/mergetree/compact/CompactManager.java     | 214 +++++++++++++++
 .../file/mergetree/compact/CompactManagerTest.java | 288 +++++++++++++++++++++
 3 files changed, 640 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
new file mode 100644
index 0000000..5c99b04
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A class which stores all level files of merge tree. */
+public class Levels {
+
+    private final Comparator<RowData> keyComparator;
+
+    private final TreeSet<SstFileMeta> level0;
+
+    private final List<SortedRun> levels;
+
+    public Levels(Comparator<RowData> keyComparator, List<SstFileMeta> 
inputFiles, int numLevels) {
+        this.keyComparator = keyComparator;
+        checkArgument(numLevels > 1, "levels must be at least 2.");
+        this.level0 =
+                new 
TreeSet<>(Comparator.comparing(SstFileMeta::maxSequenceNumber).reversed());
+        this.levels = new ArrayList<>();
+        for (int i = 1; i < numLevels; i++) {
+            levels.add(SortedRun.empty());
+        }
+
+        Map<Integer, List<SstFileMeta>> levelMap = new HashMap<>();
+        for (SstFileMeta file : inputFiles) {
+            levelMap.computeIfAbsent(file.level(), level -> new 
ArrayList<>()).add(file);
+        }
+        levelMap.forEach((level, files) -> updateLevel(level, emptyList(), 
files));
+    }
+
+    public void addLevel0File(SstFileMeta file) {
+        checkArgument(file.level() == 0);
+        level0.add(file);
+    }
+
+    public SortedRun runOfLevel(int level) {
+        checkArgument(level > 0, "Level0 dose not have one single sorted 
run.");
+        return levels.get(level - 1);
+    }
+
+    public int numberOfLevels() {
+        return levels.size() + 1;
+    }
+
+    /** @return the highest non-empty level or -1 if all levels empty. */
+    public int nonEmptyHighestLevel() {
+        int i;
+        for (i = levels.size() - 1; i >= 0; i--) {
+            if (levels.get(i).nonEmpty()) {
+                return i + 1;
+            }
+        }
+        return level0.isEmpty() ? -1 : 0;
+    }
+
+    public List<SstFileMeta> allFiles() {
+        List<SstFileMeta> files = new ArrayList<>();
+        List<LevelSortedRun> runs = levelSortedRuns();
+        for (LevelSortedRun run : runs) {
+            files.addAll(run.run().files());
+        }
+        return files;
+    }
+
+    public List<LevelSortedRun> levelSortedRuns() {
+        List<LevelSortedRun> runs = new ArrayList<>();
+        level0.forEach(file -> runs.add(new LevelSortedRun(0, 
SortedRun.fromSingle(file))));
+        for (int i = 0; i < levels.size(); i++) {
+            SortedRun run = levels.get(i);
+            if (run.nonEmpty()) {
+                runs.add(new LevelSortedRun(i + 1, run));
+            }
+        }
+        return runs;
+    }
+
+    public void update(List<SstFileMeta> before, List<SstFileMeta> after) {
+        Map<Integer, List<SstFileMeta>> groupedBefore = groupByLevel(before);
+        Map<Integer, List<SstFileMeta>> groupedAfter = groupByLevel(after);
+        for (int i = 0; i < numberOfLevels(); i++) {
+            updateLevel(
+                    i,
+                    groupedBefore.getOrDefault(i, emptyList()),
+                    groupedAfter.getOrDefault(i, emptyList()));
+        }
+    }
+
+    private void updateLevel(int level, List<SstFileMeta> before, 
List<SstFileMeta> after) {
+        if (before.isEmpty() && after.isEmpty()) {
+            return;
+        }
+
+        if (level == 0) {
+            before.forEach(level0::remove);
+            level0.addAll(after);
+        } else {
+            List<SstFileMeta> files = new 
ArrayList<>(runOfLevel(level).files());
+            files.removeAll(before);
+            files.addAll(after);
+            levels.set(level - 1, SortedRun.fromUnsorted(files, 
keyComparator));
+        }
+    }
+
+    private Map<Integer, List<SstFileMeta>> groupByLevel(List<SstFileMeta> 
files) {
+        return files.stream()
+                .collect(Collectors.groupingBy(SstFileMeta::level, 
Collectors.toList()));
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
new file mode 100644
index 0000000..f20bae4
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.Levels;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import static java.util.Collections.singletonList;
+
+/** Manager to submit compaction task. */
+public class CompactManager {
+
+    private final ExecutorService executor;
+
+    private final CompactStrategy strategy;
+
+    private final Comparator<RowData> keyComparator;
+
+    private final long minFileSize;
+
+    private final Rewriter rewriter;
+
+    private Future<CompactResult> taskFuture;
+
+    public CompactManager(
+            ExecutorService executor,
+            CompactStrategy strategy,
+            Comparator<RowData> keyComparator,
+            long minFileSize,
+            Rewriter rewriter) {
+        this.executor = executor;
+        this.minFileSize = minFileSize;
+        this.keyComparator = keyComparator;
+        this.strategy = strategy;
+        this.rewriter = rewriter;
+    }
+
+    /** Submit a new compaction task. */
+    public void submitCompaction(Levels levels) {
+        if (taskFuture != null) {
+            throw new IllegalStateException(
+                    "Please finish the previous compaction before submitting 
new one.");
+        }
+        strategy.pick(levels.numberOfLevels(), levels.levelSortedRuns())
+                .ifPresent(
+                        unit -> {
+                            if (unit.files().size() < 2) {
+                                return;
+                            }
+                            boolean dropDelete =
+                                    unit.outputLevel() >= 
levels.nonEmptyHighestLevel();
+                            CompactTask task = new CompactTask(unit, 
dropDelete);
+                            taskFuture = executor.submit(task);
+                        });
+    }
+
+    /** Finish current task, and update result files to {@link Levels}. */
+    public Optional<CompactResult> finishCompaction(Levels levels)
+            throws ExecutionException, InterruptedException {
+        if (taskFuture != null) {
+            CompactResult result = taskFuture.get();
+            levels.update(result.before(), result.after());
+            taskFuture = null;
+            return Optional.of(result);
+        }
+        return Optional.empty();
+    }
+
+    /** Rewrite sections to the files. */
+    @FunctionalInterface
+    public interface Rewriter {
+
+        List<SstFileMeta> rewrite(
+                int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections)
+                throws Exception;
+    }
+
+    /** Result of compaction. */
+    public interface CompactResult {
+
+        List<SstFileMeta> before();
+
+        List<SstFileMeta> after();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Internal classes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Compaction task. */
+    private class CompactTask implements Callable<CompactResult> {
+
+        private final int outputLevel;
+
+        private final List<List<SortedRun>> partitioned;
+
+        private final boolean dropDelete;
+
+        private CompactTask(CompactUnit unit, boolean dropDelete) {
+            this.outputLevel = unit.outputLevel();
+            this.partitioned = new IntervalPartition(unit.files(), 
keyComparator).partition();
+            this.dropDelete = dropDelete;
+        }
+
+        @Override
+        public CompactResult call() throws Exception {
+            return compact();
+        }
+
+        private CompactResult compact() throws Exception {
+            List<List<SortedRun>> candidate = new ArrayList<>();
+            List<SstFileMeta> before = new ArrayList<>();
+            List<SstFileMeta> after = new ArrayList<>();
+
+            // Checking the order and compacting adjacent and contiguous files
+            // Note: can't skip an intermediate file to compact, this will 
destroy the overall
+            // orderliness
+            for (List<SortedRun> section : partitioned) {
+                if (section.size() > 1) {
+                    candidate.add(section);
+                } else {
+                    SortedRun run = section.get(0);
+                    // No overlapping:
+                    // We can just upgrade the large file and just change the 
level instead of
+                    // rewriting it
+                    // But for small files, we will try to compact it
+                    for (SstFileMeta file : run.files()) {
+                        if (file.fileSize() < minFileSize) {
+                            // Smaller files are rewritten along with the 
previous files
+                            
candidate.add(singletonList(SortedRun.fromSingle(file)));
+                        } else {
+                            // Large file appear, rewrite previous and upgrade 
it
+                            rewrite(candidate, before, after);
+                            upgrade(file, before, after);
+                        }
+                    }
+                }
+            }
+            rewrite(candidate, before, after);
+            return result(before, after);
+        }
+
+        private void upgrade(SstFileMeta file, List<SstFileMeta> before, 
List<SstFileMeta> after) {
+            if (file.level() != outputLevel) {
+                before.add(file);
+                after.add(file.upgrade(outputLevel));
+            }
+        }
+
+        private void rewrite(
+                List<List<SortedRun>> candidate, List<SstFileMeta> before, 
List<SstFileMeta> after)
+                throws Exception {
+            if (candidate.isEmpty()) {
+                return;
+            }
+            if (candidate.size() == 1) {
+                List<SortedRun> section = candidate.get(0);
+                if (section.size() == 0) {
+                    return;
+                } else if (section.size() == 1) {
+                    for (SstFileMeta file : section.get(0).files()) {
+                        upgrade(file, before, after);
+                    }
+                    candidate.clear();
+                    return;
+                }
+            }
+            candidate.forEach(runs -> runs.forEach(run -> 
before.addAll(run.files())));
+            after.addAll(rewriter.rewrite(outputLevel, dropDelete, candidate));
+            candidate.clear();
+        }
+
+        private CompactResult result(List<SstFileMeta> before, 
List<SstFileMeta> after) {
+            return new CompactResult() {
+                @Override
+                public List<SstFileMeta> before() {
+                    return before;
+                }
+
+                @Override
+                public List<SstFileMeta> after() {
+                    return after;
+                }
+            };
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
new file mode 100644
index 0000000..560e298
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+import org.apache.flink.table.store.file.mergetree.Levels;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompactManager}. */
+public class CompactManagerTest {
+
+    private final Comparator<RowData> comparator = Comparator.comparingInt(o 
-> o.getInt(0));
+
+    private static ExecutorService service;
+
+    @BeforeAll
+    public static void before() {
+        service = Executors.newSingleThreadExecutor();
+    }
+
+    @AfterAll
+    public static void after() {
+        service.shutdownNow();
+        service = null;
+    }
+
+    @Test
+    public void testCompactToPenultimateLayer() throws ExecutionException, 
InterruptedException {
+        innerTest(
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 3),
+                        new LevelMinMax(0, 1, 5),
+                        new LevelMinMax(2, 1, 7)),
+                Arrays.asList(new LevelMinMax(1, 1, 5), new LevelMinMax(2, 1, 
7)),
+                new CompactStrategy() {
+                    @Override
+                    public Optional<CompactUnit> pick(int numLevels, 
List<LevelSortedRun> runs) {
+                        return Optional.of(CompactUnit.fromLevelRuns(1, 
runs.subList(0, 2)));
+                    }
+                },
+                false);
+    }
+
+    @Test
+    public void testNoCompaction() throws ExecutionException, 
InterruptedException {
+        innerTest(
+                Collections.singletonList(new LevelMinMax(0, 1, 3)),
+                Collections.singletonList(new LevelMinMax(0, 1, 3)));
+    }
+
+    @Test
+    public void testNormal() throws ExecutionException, InterruptedException {
+        innerTest(
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 3),
+                        new LevelMinMax(1, 1, 5),
+                        new LevelMinMax(1, 6, 7)),
+                Arrays.asList(new LevelMinMax(2, 1, 5), new LevelMinMax(2, 6, 
7)));
+    }
+
+    @Test
+    public void testUpgrade() throws ExecutionException, InterruptedException {
+        innerTest(
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 3),
+                        new LevelMinMax(0, 1, 5),
+                        new LevelMinMax(0, 6, 8)),
+                Arrays.asList(new LevelMinMax(2, 1, 5), new LevelMinMax(2, 6, 
8)));
+    }
+
+    @Test
+    public void testSmallFiles() throws ExecutionException, 
InterruptedException {
+        innerTest(
+                Arrays.asList(new LevelMinMax(0, 1, 1), new LevelMinMax(0, 2, 
2)),
+                Collections.singletonList(new LevelMinMax(2, 1, 2)));
+    }
+
+    @Test
+    public void testSmallFilesNoCompact() throws ExecutionException, 
InterruptedException {
+        innerTest(
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 5),
+                        new LevelMinMax(0, 6, 6),
+                        new LevelMinMax(1, 7, 8),
+                        new LevelMinMax(1, 9, 10)),
+                Arrays.asList(
+                        new LevelMinMax(2, 1, 5),
+                        new LevelMinMax(2, 6, 6),
+                        new LevelMinMax(2, 7, 8),
+                        new LevelMinMax(2, 9, 10)));
+    }
+
+    @Test
+    public void testSmallFilesCrossLevel() throws ExecutionException, 
InterruptedException {
+        innerTest(
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 5),
+                        new LevelMinMax(0, 6, 6),
+                        new LevelMinMax(1, 7, 7),
+                        new LevelMinMax(1, 9, 10)),
+                Arrays.asList(
+                        new LevelMinMax(2, 1, 5),
+                        new LevelMinMax(2, 6, 7),
+                        new LevelMinMax(2, 9, 10)));
+    }
+
+    @Test
+    public void testComplex() throws ExecutionException, InterruptedException {
+        innerTest(
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 5),
+                        new LevelMinMax(0, 6, 6),
+                        new LevelMinMax(1, 1, 4),
+                        new LevelMinMax(1, 6, 8),
+                        new LevelMinMax(1, 10, 11),
+                        new LevelMinMax(2, 1, 3),
+                        new LevelMinMax(2, 4, 6)),
+                Arrays.asList(new LevelMinMax(2, 1, 8), new LevelMinMax(2, 10, 
11)));
+    }
+
+    @Test
+    public void testSmallInComplex() throws ExecutionException, 
InterruptedException {
+        innerTest(
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 5),
+                        new LevelMinMax(0, 6, 6),
+                        new LevelMinMax(1, 1, 4),
+                        new LevelMinMax(1, 6, 8),
+                        new LevelMinMax(1, 10, 10),
+                        new LevelMinMax(2, 1, 3),
+                        new LevelMinMax(2, 4, 6)),
+                Collections.singletonList(new LevelMinMax(2, 1, 10)));
+    }
+
+    private void innerTest(List<LevelMinMax> inputs, List<LevelMinMax> 
expected)
+            throws ExecutionException, InterruptedException {
+        innerTest(inputs, expected, testStrategy(), true);
+    }
+
+    private void innerTest(
+            List<LevelMinMax> inputs,
+            List<LevelMinMax> expected,
+            CompactStrategy strategy,
+            boolean expectedDropDelete)
+            throws ExecutionException, InterruptedException {
+        List<SstFileMeta> files = new ArrayList<>();
+        for (int i = 0; i < inputs.size(); i++) {
+            LevelMinMax minMax = inputs.get(i);
+            files.add(minMax.toFile(i));
+        }
+        Levels levels = new Levels(comparator, files, 3);
+        CompactManager manager =
+                new CompactManager(
+                        service, strategy, comparator, 2, 
testRewriter(expectedDropDelete));
+        manager.submitCompaction(levels);
+        manager.finishCompaction(levels);
+        List<LevelMinMax> outputs =
+                
levels.allFiles().stream().map(LevelMinMax::new).collect(Collectors.toList());
+        assertThat(outputs).isEqualTo(expected);
+    }
+
+    private static SstFileMeta newFile(int level, int minKey, int maxKey, long 
maxSequence) {
+        return new SstFileMeta(
+                "", maxKey - minKey + 1, 1, row(minKey), row(maxKey), null, 0, 
maxSequence, level);
+    }
+
+    public static BinaryRowData row(int i) {
+        BinaryRowData row = new BinaryRowData(1);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.writeInt(0, i);
+        writer.complete();
+        return row;
+    }
+
+    private CompactStrategy testStrategy() {
+        return (numLevels, runs) -> 
Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs));
+    }
+
+    private CompactManager.Rewriter testRewriter(boolean expectedDropDelete) {
+        return (outputLevel, dropDelete, sections) -> {
+            assertThat(dropDelete).isEqualTo(expectedDropDelete);
+            int minKey = Integer.MAX_VALUE;
+            int maxKey = Integer.MIN_VALUE;
+            long maxSequence = 0;
+            for (List<SortedRun> section : sections) {
+                for (SortedRun run : section) {
+                    for (SstFileMeta file : run.files()) {
+                        int min = file.minKey().getInt(0);
+                        int max = file.maxKey().getInt(0);
+                        if (min < minKey) {
+                            minKey = min;
+                        }
+                        if (max > maxKey) {
+                            maxKey = max;
+                        }
+                        if (file.maxSequenceNumber() > maxSequence) {
+                            maxSequence = file.maxSequenceNumber();
+                        }
+                    }
+                }
+            }
+            return Collections.singletonList(newFile(outputLevel, minKey, 
maxKey, maxSequence));
+        };
+    }
+
+    private static class LevelMinMax {
+
+        private final int level;
+        private final int min;
+        private final int max;
+
+        private LevelMinMax(SstFileMeta file) {
+            this.level = file.level();
+            this.min = file.minKey().getInt(0);
+            this.max = file.maxKey().getInt(0);
+        }
+
+        private LevelMinMax(int level, int min, int max) {
+            this.level = level;
+            this.min = min;
+            this.max = max;
+        }
+
+        private SstFileMeta toFile(long maxSequence) {
+            return newFile(level, min, max, maxSequence);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            LevelMinMax that = (LevelMinMax) o;
+            return level == that.level && min == that.min && max == that.max;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(level, min, max);
+        }
+
+        @Override
+        public String toString() {
+            return "LevelMinMax{" + "level=" + level + ", min=" + min + ", 
max=" + max + '}';
+        }
+    }
+}

Reply via email to