Thanks Junhao for your proposal, Can you send a formal format PIP discussion email? Just like https://lists.apache.org/thread/dyq1jyrj8cqb26z84bhqr7xx1pn7ctj8
And the content in the PIP can be optimized to formal format too. Best, Jingsong On Mon, May 22, 2023 at 4:49 PM JUNHAO YE <[email protected]> wrote: > Hi, Guys. > I want to start a conversation about improving paimon batch processing. > see > https://cwiki.apache.org/confluence/display/PAIMON/PIP-6%3A+Batch+Table+Without+Bucket > > *PREFACE:* > > Currently, paimon has very high support for stream write and stream read, > but not enough for traditional batch processing. After the table is > created, you need to display the specified bucket key and bucket number; > otherwise, the AppendOnly table or changelog table for a single bucket is > created. When there is only one bucket, concurrent read/write and compact > cannot be performed, resulting in poor batch performance. > In a traditional offline bin, users don't care about buckets and don't > want to care about buckets. In the case of batch off-line computing, the > bucket concept created by paimon for real-time stream reading and writing > does not apply. In the case of non-stream read and write, the guaranteed > record order based on sequence.field does not make much sense. Because > without streaming retract, only concurrent insert data flows into the > table. The rest of the processing is done in batch mode. > > Therefore, it is necessary to design a table, which needs to meet the > following characteristics: > 1. There is no concept of primary key and bucket key, which is equivalent > to the offline table in the offline bucket. > 2. Only inserting data in stream mode is supported.(Only “I” type data) As > an offline table, data can be inserted synchronously in real time, but > there is no need for real-time delete, update, etc. > 3. streaming write, support concurrent write, concurrent compact. > 4. update data and delete data through batch tasks. > 5. Write first read order is not guaranteed. > > > *SCENARIO:* > > The customer has a large amount of order transaction data and expects it > to automatically flow into the offline table every time the transaction > data is available, and batch task statistics will be performed at 12 > o'clock every day > Combined with the flink computing engine, we can create tables when only > offline batch tables are needed: > > *CREATE TABLE Orders ( > order_id INT, > order_type STRING, > `date` TIMESTAMP, > price INT, > number INT > ) PARTITIONED BY (order_type) > WITH ( > 'write-mode' = 'table' > );* > > > There isn’t any property about the bucket. > > We want this table to be used for olap offline analysis, such as once a day > statistics. But its data volume and traffic volume are large, so we hope it > can update by itself: > > *INSERT INTO Orders > SELECT * FROM OrderSource;* > > > Conduct off-line analysis and query to calculate the total sales in one day: > > *SELECT sum(price * number) FROM Orders GROUP BY DATE_FORMAT(`date`, > 'yyyyMMdd’);* > > > Statistical order type transaction quantity: > > *SELECT order_type, sum(*) FROM Orders GROUP BY order_type;* > > > The order type has changed, and all historical data needs to be changed > (flink-version >= 1.17) : > > *UPDATE Orders SET order_type = 'banana' WHERE order_type = 'apple' AND > `date` > TO_TIMESTAMP('2020-02-02', 'yyyy-HH-dd’);* > > > *DESIGN:* > > At the paimon project level, we need a new table, a new write mode. In this > mode: > > 1. +I data only, no +U -U -D data types. > > 2. All data goes to the default bucket=0 file directory (for consistency with > the previous design). > > 3. No sequence.number, data sequence reading and writing is not required > > 4. Separate compaction from writer, and writer is no longer responsible for > compact at the same time. This is for Solving the problem of compact when > single bucket is written concurrently (default write-only=true when building > table) > > 5. Create new members: CompactionCoordinator and CompactionWorker. > CompactionCoordinator is a single concurrent coordinator that receives data > written by upstream writers. CompactionWorker is a multi-concurrent > compaction executor that runs only the compaction task specified by the > coordinator. > > > At the computing engine level, we build the following topology when writing > in real time: > > [image: image.png] > > > [image: 粘贴的图形-2.tiff] > > 1. In the prepareSnapshot phase, the writer flushes the new file, the > compaction coordinator receives the new file, and the compaction coordinator > reads the last delta file from the latest snapshot and adds it to the > restored files. > > Also, depending on a strategy, decide whether to create a compaction. > > > 2. A compaction coordinator delivers a compaction task. Every compaction > worker executes tasks based on any compaction that occurs and any new file > submitted by the writer. > > Build the commit message again. Pass commit message to downstream > committer after execution. > > > 3. The snapshot stage saves the status information of each operator. > > > 4. During the notify cp complete phase, the committer submits the file > information that compacts. Generate a new snapshot. When a compaction > coordinator next prepares snapshot, > > it reads the snapshot delta (and updates its restored files based on the > saved snapshotid). > > > > Use batch mode for scenarios such as delete and update. RowType such as +U -U > -D is not used while performing deleting and updating operations. Real file > replacement is used for deleting and updating. > > After each delete update operation, a new snapshot is generated. > > > > Please write back about your opinions. > > > > Best, > > Junhao. > >
