This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch object_type_tiff in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 91ef9b5f3c2109fd825cceb85081542a65db8b44 Author: spricoder <[email protected]> AuthorDate: Tue Sep 9 17:37:43 2025 +0800 Revert "更新优化版" This reverts commit 55723c736a2b6b136a3699adf34936d0993ed49a. --- .../rpc/model/CompressedTiffModelProcessor.java | 106 ++++------- .../db/utils/model/CompressedTiffModelReader.java | 209 +++------------------ 2 files changed, 60 insertions(+), 255 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java index e6f351d5bcf..11c937e1160 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/model/CompressedTiffModelProcessor.java @@ -26,25 +26,19 @@ import org.gdal.gdal.Driver; import org.gdal.gdal.gdal; import org.gdal.gdalconst.gdalconstConstants; -import java.util.ArrayList; -import java.util.List; - public class CompressedTiffModelProcessor extends ModelProcessor { private static final Driver DRIVER; private static final String VIRTUAL_FILE_PATH_PREFIX = "/vsimem"; private static final String VIRTUAL_FILE_PATH_SUFFIX = ".tif"; - // ---- 写入压缩配置(核心配置)---- - private static final String COMPRESS = "ZSTD"; // 备选:"DEFLATE" / "LZW" - private static final int ZSTD_LEVEL = 3; // 仅 ZSTD 有效,可调 1~22 - private static final String PREDICTOR = "2"; // 浮点差分 - private static final int ROWS_PER_STRIP = 12; // 行随机读最佳;若 strip 过多可设 8/16/32 - - // 固定哨兵值作为 NoData(兼容面好于直接写 NaN) - private static final float NODATA_SENTINEL = -3.4028235e38f; + // Specifying compression options + private static String compressOption = "COMPRESS=LZW"; + // Specifying block x size options + private static String blockXSize = "BLOCKXSIZE=256"; + // Specifying block y size options + private static String blockYSize = "BLOCKYSIZE=256"; static { gdal.AllRegister(); - gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS"); DRIVER = gdal.GetDriverByName("GTiff"); if (DRIVER == null) { throw new RuntimeException("Failed to get GTiff driver: " + gdal.GetLastErrorMsg()); @@ -62,51 +56,31 @@ public class CompressedTiffModelProcessor extends ModelProcessor { } private byte[] write(String filePath, float[] values, int width, int height) { - if (values == null || values.length != (long) width * height) { - throw new IllegalArgumentException("values length must be width*height"); - } - - List<String> opts = new ArrayList<>(); - opts.add("BIGTIFF=IF_SAFER"); - opts.add("TILED=NO"); // 明确 strip 组织 - opts.add("BLOCKYSIZE=" + ROWS_PER_STRIP); // rows-per-strip - opts.add("PREDICTOR=" + PREDICTOR); - opts.add("NUM_THREADS=ALL_CPUS"); - if ("ZSTD".equalsIgnoreCase(COMPRESS)) { - opts.add("COMPRESS=ZSTD"); - opts.add("ZSTD_LEVEL=" + ZSTD_LEVEL); - } else if ("DEFLATE".equalsIgnoreCase(COMPRESS)) { - opts.add("COMPRESS=DEFLATE"); - } else if ("LZW".equalsIgnoreCase(COMPRESS)) { - opts.add("COMPRESS=LZW"); - } else { - throw new IllegalArgumentException("Unsupported COMPRESS=" + COMPRESS); - } - String[] options = opts.toArray(new String[0]); + // floating point data should use predictor 2 (for difference prediction), and use block storage + // (recommended for LZW) + String[] options = + new String[] {compressOption, "PREDICTOR=2", "TILED=YES", blockXSize, blockYSize}; - Dataset ds = null; + Dataset dataset = null; try { - ds = DRIVER.Create(filePath, width, height, 1, gdalconstConstants.GDT_Float32, options); - if (ds == null) { + // Create dataset with specified options + dataset = DRIVER.Create(filePath, width, height, 1, gdalconstConstants.GDT_Float32, options); + + if (dataset == null) { throw new RuntimeException("Failed to create dataset: " + gdal.GetLastErrorMsg()); } - Band band = ds.GetRasterBand(1); - - // 统一设置 NoData(固定哨兵值) - band.SetNoDataValue(NODATA_SENTINEL); - // 顺序写整幅数据(strip 组织下吞吐较好) - int err = band.WriteRaster(0, 0, width, height, values); - if (err != gdalconstConstants.CE_None) { - throw new RuntimeException("Failed to write data: " + gdal.GetLastErrorMsg()); + Band band = dataset.GetRasterBand(1); + int result = band.WriteRaster(0, 0, width, height, values); + if (result != gdalconstConstants.CE_None) { + throw new RuntimeException("Failed to write data to tiff file: " + gdal.GetLastErrorMsg()); } - band.FlushCache(); - ds.FlushCache(); + dataset.FlushCache(); return VsiGdalNative.vsiGetMemFileBuffer(filePath, true); } finally { - if (ds != null) { - ds.delete(); + if (dataset != null) { + dataset.delete(); } } } @@ -124,33 +98,23 @@ public class CompressedTiffModelProcessor extends ModelProcessor { @Override public float[] readAll(String filePath) { - Dataset ds = gdal.OpenShared(filePath, gdalconstConstants.GA_ReadOnly); - if (ds == null) { - throw new RuntimeException("Failed to open: " + gdal.GetLastErrorMsg()); + Dataset dataset = gdal.Open(filePath, gdalconstConstants.GA_ReadOnly); + if (dataset == null) { + throw new RuntimeException("Failed to open tiff file: " + gdal.GetLastErrorMsg()); } try { - Band band = ds.GetRasterBand(1); - int w = band.getXSize(), h = band.getYSize(); - float[] out = new float[(int) ((long) w * h)]; - int err = band.ReadRaster(0, 0, w, h, gdalconstConstants.GDT_Float32, out); - if (err != gdalconstConstants.CE_None) { - throw new RuntimeException("ReadRaster(all) failed: " + gdal.GetLastErrorMsg()); - } - - // NoData -> NaN - Double[] nd = new Double[1]; - band.GetNoDataValue(nd); - float nodata = (nd[0] == null || Double.isNaN(nd[0])) ? Float.NaN : nd[0].floatValue(); - if (!Float.isNaN(nodata)) { - for (int i = 0; i < out.length; i++) { - if (out[i] == nodata) { - out[i] = Float.NaN; - } - } + Band band = dataset.GetRasterBand(1); + if (band == null) { + throw new RuntimeException( + "Failed to get raster band from dataset" + gdal.GetLastErrorMsg()); } - return out; + int width = band.getXSize(); + int height = band.getYSize(); + float[] result = new float[width * height]; + band.ReadRaster(0, 0, width, height, gdalconstConstants.GDT_Float32, result); + return result; } finally { - ds.delete(); + dataset.delete(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java index 9cbd42b130f..19ede8e44bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/model/CompressedTiffModelReader.java @@ -27,207 +27,48 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; public class CompressedTiffModelReader extends ModelReader { private static final Logger LOGGER = LoggerFactory.getLogger(CompressedTiffModelReader.class); static { gdal.AllRegister(); - gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS"); - } - - // 线程本地复用的读缓冲,避免频繁分配 - private static final ThreadLocal<float[]> TL_SCRATCH = - ThreadLocal.withInitial(() -> new float[0]); - - private static float[] ensureCapacity(float[] buf, int need) { - if (buf.length >= need) return buf; - int cap = Math.max(buf.length * 2, need); - float[] n = new float[cap]; - TL_SCRATCH.set(n); - return n; - } - - /** 合并相邻/重叠区间;mergeGap=0 表示仅相邻或重叠才合并 */ - private static List<int[]> mergeRanges(List<int[]> ranges, int mergeGap) { - if (ranges.isEmpty()) return ranges; - ranges.sort(Comparator.comparingInt(a -> a[0])); - List<int[]> out = new ArrayList<>(); - int s = ranges.get(0)[0], e = ranges.get(0)[1]; - for (int i = 1; i < ranges.size(); i++) { - int ns = ranges.get(i)[0], ne = ranges.get(i)[1]; - if (ns <= e + 1 + mergeGap) { - e = Math.max(e, ne); - } else { - out.add(new int[] {s, e}); - s = ns; - e = ne; - } - } - out.add(new int[] {s, e}); - return out; + int cacheMax = gdal.GetCacheMax(); + LOGGER.info("GDAL Cache Max: {}", cacheMax); + gdal.SetCacheMax(cacheMax * 2); } @Override public List<float[]> penetrate(String filePath, List<List<Integer>> startAndEndTimeArray) { - if (startAndEndTimeArray == null || startAndEndTimeArray.isEmpty()) { - return Collections.emptyList(); - } - - // 为保证返回顺序与输入一致:先把输入解析成条目列表(带原始 index) - class Req { - final int idx; // 原始顺序 - final int startPix; // 像元索引(0..total-1) - final int endPix; // 像元索引(>=startPix) - int row, col0, col1; // 解析后的行号与列范围 - - Req(int idx, int s, int e) { - this.idx = idx; - this.startPix = Math.min(s, e); - this.endPix = Math.max(s, e); - } + Dataset dataset = gdal.Open(filePath, gdalconstConstants.GA_ReadOnly); + if (dataset == null) { + LOGGER.error("Failed to open tiff file: {}", gdal.GetLastErrorMsg()); + throw new RuntimeException("Failed to open tiff file: " + gdal.GetLastErrorMsg()); } - List<Req> reqs = new ArrayList<>(startAndEndTimeArray.size()); - for (int i = 0; i < startAndEndTimeArray.size(); i++) { - List<Integer> r = startAndEndTimeArray.get(i); - if (r == null || r.size() < 2) { - throw new IllegalArgumentException("Each range must be [start, end]."); - } - reqs.add(new Req(i, r.get(0), r.get(1))); - } - - Dataset ds = gdal.OpenShared(filePath, gdalconstConstants.GA_ReadOnly); - if (ds == null) { - throw new RuntimeException("Failed to open: " + gdal.GetLastErrorMsg()); - } - try { - Band band = ds.GetRasterBand(1); - final int width = band.getXSize(); - final int height = band.getYSize(); - final long total = (long) width * (long) height; - - // 解析行/列 & 校验仅同一行 - for (Req q : reqs) { - if (q.startPix < 0 || (long) q.endPix >= total) { - throw new IndexOutOfBoundsException( - String.format("Range [%d,%d] out of bounds [0,%d).", q.startPix, q.endPix, total)); - } - int sRow = q.startPix / width, sCol = q.startPix % width; - int eRow = q.endPix / width, eCol = q.endPix % width; - if (sRow != eRow) { - throw new IllegalArgumentException( - "Range crosses rows: [" + q.startPix + "," + q.endPix + "]"); - } - q.row = sRow; - q.col0 = sCol; - q.col1 = eCol; - } - - // NoData 配置 - Double[] nd = new Double[1]; - band.GetNoDataValue(nd); - final boolean needMapNoData = nd[0] != null && !Double.isNaN(nd[0]); - final float nodata = needMapNoData ? nd[0].floatValue() : Float.NaN; - - // 行 -> (合并前的列区间列表) - Map<Integer, List<int[]>> perRow = new LinkedHashMap<>(); - for (Req q : reqs) { - perRow.computeIfAbsent(q.row, k -> new ArrayList<>()).add(new int[] {q.col0, q.col1}); - } - - // 为每个请求预先分配目标数组,最后按 idx 顺序收集 - float[][] outputs = new float[reqs.size()][]; - for (Req q : reqs) { - outputs[q.idx] = new float[q.col1 - q.col0 + 1]; - } - - // mergeGap 可按需要调大,如 4/8(允许读取少量“间隙像元”换更少的 ReadRaster 次数) - final int mergeGap = 0; - - // 行内再建立“原始区间列表”(保持输入顺序),用于把窗口数据拆回去 - Map<Integer, List<Req>> rowReqs = new LinkedHashMap<>(); - for (Req q : reqs) rowReqs.computeIfAbsent(q.row, k -> new ArrayList<>()).add(q); - - // 逐行处理 - for (Map.Entry<Integer, List<int[]>> e : perRow.entrySet()) { - int row = e.getKey(); - List<int[]> ranges = e.getValue(); - - // 合并区间 -> 更少的读取窗口 - List<int[]> merged = mergeRanges(ranges, mergeGap); - - // 按起点排序,便于窗口覆盖 - List<Req> rowList = rowReqs.get(row); - rowList.sort(Comparator.comparingInt(a -> a.col0)); - int p = 0; - - for (int[] win : merged) { - int c0 = win[0], c1 = win[1]; - int winLen = c1 - c0 + 1; - - float[] scratch = ensureCapacity(TL_SCRATCH.get(), winLen); - int err = band.ReadRaster(c0, row, winLen, 1, gdalconstConstants.GDT_Float32, scratch); - if (err != gdalconstConstants.CE_None) { - throw new RuntimeException( - "ReadRaster(row=" - + row - + ", " - + c0 - + ":" - + c1 - + ") failed: " - + gdal.GetLastErrorMsg()); - } - - // 把窗口分发给所有落在其中的原始区间 - while (p < rowList.size()) { - Req q = rowList.get(p); - if (q.col1 < c0) { - p++; - continue; - } // 在窗口左侧,跳过 - if (q.col0 > c1) { - break; - } // 窗口右侧,进入下一个窗口 - - int from = Math.max(q.col0, c0); - int to = Math.min(q.col1, c1); - int len = to - from + 1; - - float[] dst = outputs[q.idx]; - System.arraycopy(scratch, from - c0, dst, from - q.col0, len); - - // 注意:若 mergeGap>0,极端情况下一个 req 可能跨两个合并窗口; - // 这里不 p++,而是仅当整个 req 覆盖完才前移指针 - if (to == q.col1) { - p++; - } // 该 req 已经完全覆盖 - else { - break; - } // 仍有剩余,等待下一窗口补齐 - } - } + Band band = dataset.GetRasterBand(1); + if (band == null) { + LOGGER.error("Failed to get raster band from dataset: {}", gdal.GetLastErrorMsg()); + throw new RuntimeException( + "Failed to get raster band from dataset" + gdal.GetLastErrorMsg()); } - - // NoData -> NaN - if (needMapNoData) { - for (float[] arr : outputs) { - for (int i = 0; i < arr.length; i++) if (arr[i] == nodata) arr[i] = Float.NaN; - } + int width = band.getXSize(); + List<float[]> result = new ArrayList<>(); + for (List<Integer> startAndEndTime : startAndEndTimeArray) { + int xIndex = startAndEndTime.get(0); + int yIndex = startAndEndTime.get(1); + float[] tmp = new float[yIndex - xIndex + 1]; + int xOff = xIndex % width; + int yOff = xIndex / width; + int xSize = yIndex - xIndex + 1; + int ySize = 1; + band.ReadRaster(xOff, yOff, xSize, ySize, gdalconstConstants.GDT_Float32, tmp); + result.add(tmp); } - - // 按原始顺序返回 - List<float[]> result = new ArrayList<>(outputs.length); - for (int i = 0; i < outputs.length; i++) result.add(outputs[i]); return result; } finally { - ds.delete(); + dataset.delete(); } } }
