Jackie-Jiang commented on code in PR #10927:
URL: https://github.com/apache/pinot/pull/10927#discussion_r1291838451
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -60,29 +63,59 @@ public PartialUpsertHandler(Schema schema, Map<String,
UpsertConfig.Strategy> pa
* For example, overwrite merger will only override the prev value if the
new value is not null.
* Null values will override existing values if not configured. They can be
ignored by using ignoreMerger.
*
- * @param previousRecord the last derived full record during ingestion.
+ * @param indexSegment the segment of the last derived full record during
ingestion.
+ * @param docId the docId of the last derived full record during ingestion
in the segment.
* @param newRecord the new consumed record.
- * @return a new row after merge
*/
- public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
- for (String column : previousRecord.getFieldToValueMap().keySet()) {
+ public void merge(IndexSegment indexSegment, int docId, GenericRow
newRecord) {
+ for (String column: indexSegment.getColumnNames()) {
Review Comment:
(format) Reformat the file with Pinot Style
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -60,29 +63,59 @@ public PartialUpsertHandler(Schema schema, Map<String,
UpsertConfig.Strategy> pa
* For example, overwrite merger will only override the prev value if the
new value is not null.
* Null values will override existing values if not configured. They can be
ignored by using ignoreMerger.
*
- * @param previousRecord the last derived full record during ingestion.
+ * @param indexSegment the segment of the last derived full record during
ingestion.
+ * @param docId the docId of the last derived full record during ingestion
in the segment.
* @param newRecord the new consumed record.
- * @return a new row after merge
*/
- public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
- for (String column : previousRecord.getFieldToValueMap().keySet()) {
+ public void merge(IndexSegment indexSegment, int docId, GenericRow
newRecord) {
+ for (String column: indexSegment.getColumnNames()) {
if (!_primaryKeyColumns.contains(column)) {
- if (!previousRecord.isNullValue(column)) {
+ PartialUpsertMerger merger = _column2Mergers.getOrDefault(column,
_defaultPartialUpsertMerger);
+ // Non-overwrite mergers
+ // (1) If the value of the previous is null value, skip merging and
use the new value
+ // (2) Else If the value of new value is null, use the previous value
(even for comparison columns).
+ // (3) Else If the column is not a comparison column, we applied the
merged value to it.
+ if (!(merger instanceof OverwriteMerger)) {
+ PinotSegmentColumnReader pinotSegmentColumnReader = new
PinotSegmentColumnReader(indexSegment, column);
Review Comment:
We can do try-with-resource here
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -60,29 +63,59 @@ public PartialUpsertHandler(Schema schema, Map<String,
UpsertConfig.Strategy> pa
* For example, overwrite merger will only override the prev value if the
new value is not null.
* Null values will override existing values if not configured. They can be
ignored by using ignoreMerger.
*
- * @param previousRecord the last derived full record during ingestion.
+ * @param indexSegment the segment of the last derived full record during
ingestion.
+ * @param docId the docId of the last derived full record during ingestion
in the segment.
* @param newRecord the new consumed record.
- * @return a new row after merge
*/
- public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
- for (String column : previousRecord.getFieldToValueMap().keySet()) {
+ public void merge(IndexSegment indexSegment, int docId, GenericRow
newRecord) {
+ for (String column: indexSegment.getColumnNames()) {
if (!_primaryKeyColumns.contains(column)) {
- if (!previousRecord.isNullValue(column)) {
+ PartialUpsertMerger merger = _column2Mergers.getOrDefault(column,
_defaultPartialUpsertMerger);
+ // Non-overwrite mergers
+ // (1) If the value of the previous is null value, skip merging and
use the new value
+ // (2) Else If the value of new value is null, use the previous value
(even for comparison columns).
+ // (3) Else If the column is not a comparison column, we applied the
merged value to it.
+ if (!(merger instanceof OverwriteMerger)) {
+ PinotSegmentColumnReader pinotSegmentColumnReader = new
PinotSegmentColumnReader(indexSegment, column);
+ if (!pinotSegmentColumnReader.isNull(docId)) {
+ Object previousValue = pinotSegmentColumnReader.getValue(docId);
+ if (newRecord.isNullValue(column)) {
+ // Note that we intentionally want to overwrite any previous
_comparisonColumn value in the case of using
+ // multiple comparison columns. We never apply a merge function
to it, rather we just take any/all
+ // non-null comparison column values from the previous record,
and the sole non-null comparison column
+ // value from the new record.
+ newRecord.putValue(column, previousValue);
+ } else if (!_comparisonColumns.contains(column)) {
+ newRecord.putValue(column,
+ merger.merge(previousValue, newRecord.getValue(column)));
+ }
+ }
+ try {
+ pinotSegmentColumnReader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Caught exception while closing
pinotSegmentColumnReader for column: %s", column), e);
+ }
+ } else {
+ // Overwrite mergers.
+ // (1) If the merge strategy is Overwrite mergers and newValue is
not null. skip and use the new value
Review Comment:
```suggestion
// (1) If the merge strategy is Overwrite merger and newValue is
not null, skip and use the new value
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -60,29 +63,59 @@ public PartialUpsertHandler(Schema schema, Map<String,
UpsertConfig.Strategy> pa
* For example, overwrite merger will only override the prev value if the
new value is not null.
* Null values will override existing values if not configured. They can be
ignored by using ignoreMerger.
*
- * @param previousRecord the last derived full record during ingestion.
+ * @param indexSegment the segment of the last derived full record during
ingestion.
+ * @param docId the docId of the last derived full record during ingestion
in the segment.
* @param newRecord the new consumed record.
- * @return a new row after merge
*/
- public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
- for (String column : previousRecord.getFieldToValueMap().keySet()) {
+ public void merge(IndexSegment indexSegment, int docId, GenericRow
newRecord) {
+ for (String column: indexSegment.getColumnNames()) {
if (!_primaryKeyColumns.contains(column)) {
- if (!previousRecord.isNullValue(column)) {
+ PartialUpsertMerger merger = _column2Mergers.getOrDefault(column,
_defaultPartialUpsertMerger);
+ // Non-overwrite mergers
+ // (1) If the value of the previous is null value, skip merging and
use the new value
+ // (2) Else If the value of new value is null, use the previous value
(even for comparison columns).
+ // (3) Else If the column is not a comparison column, we applied the
merged value to it.
+ if (!(merger instanceof OverwriteMerger)) {
+ PinotSegmentColumnReader pinotSegmentColumnReader = new
PinotSegmentColumnReader(indexSegment, column);
+ if (!pinotSegmentColumnReader.isNull(docId)) {
+ Object previousValue = pinotSegmentColumnReader.getValue(docId);
+ if (newRecord.isNullValue(column)) {
+ // Note that we intentionally want to overwrite any previous
_comparisonColumn value in the case of using
+ // multiple comparison columns. We never apply a merge function
to it, rather we just take any/all
+ // non-null comparison column values from the previous record,
and the sole non-null comparison column
+ // value from the new record.
+ newRecord.putValue(column, previousValue);
+ } else if (!_comparisonColumns.contains(column)) {
+ newRecord.putValue(column,
+ merger.merge(previousValue, newRecord.getValue(column)));
+ }
+ }
+ try {
+ pinotSegmentColumnReader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Caught exception while closing
pinotSegmentColumnReader for column: %s", column), e);
+ }
+ } else {
+ // Overwrite mergers.
+ // (1) If the merge strategy is Overwrite mergers and newValue is
not null. skip and use the new value
+ // (2) Otherwise, if previous is not null, init columnReader and use
the previous value.
if (newRecord.isNullValue(column)) {
- // Note that we intentionally want to overwrite any previous
_comparisonColumn value in the case of using
- // multiple comparison columns. We never apply a merge function to
it, rather we just take any/all non-null
- // comparison column values from the previous record, and the sole
non-null comparison column value from
- // the new record.
- newRecord.putValue(column, previousRecord.getValue(column));
- newRecord.removeNullValueField(column);
- } else if (!_comparisonColumns.contains(column)) {
- PartialUpsertMerger merger = _column2Mergers.getOrDefault(column,
_defaultPartialUpsertMerger);
- newRecord.putValue(column,
- merger.merge(previousRecord.getValue(column),
newRecord.getValue(column)));
+ PinotSegmentColumnReader pinotSegmentColumnReader = new
PinotSegmentColumnReader(indexSegment, column);
Review Comment:
Same here, use try-with-resource
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]