This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
new cedf2f240a [bug] fix window function nullable type bug
cedf2f240a is described below
commit cedf2f240a7f6fad3067f052db56760b0178e94b
Author: starocean999 <[email protected]>
AuthorDate: Tue Jun 21 18:42:21 2022 +0800
[bug] fix window function nullable type bug
nullable side of outer join should always produce nullable value
---
be/src/vec/exec/join/vhash_join_node.cpp | 42 ++-
be/src/vec/exec/vaggregation_node.cpp | 60 ++++-
be/src/vec/exec/vaggregation_node.h | 3 +
be/src/vec/exec/vanalytic_eval_node.cpp | 11 +-
.../java/org/apache/doris/analysis/Analyzer.java | 6 -
.../test_outer_join_with_window_function.out | 4 +
.../test_outer_join_with_window_function.groovy | 289 +++++++++++++++++++++
7 files changed, 392 insertions(+), 23 deletions(-)
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index d91ff2d22b..aefe93e89f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -190,10 +190,15 @@ struct ProcessHashTableProbe {
if constexpr (!is_semi_anti_join || have_other_join_conjunct) {
if (_build_blocks.size() == 1) {
for (int i = 0; i < column_length; i++) {
- auto& column = *_build_blocks[0].get_by_position(i).column;
if (output_slot_flags[i]) {
- mcol[i + column_offset]->insert_indices_from(column,
_build_block_rows.data(),
-
_build_block_rows.data() + size);
+ auto column =
_build_blocks[0].get_by_position(i).column;
+ if (mcol[i + column_offset]->is_nullable() xor
column->is_nullable()) {
+ DCHECK(mcol[i + column_offset]->is_nullable() &&
+ !column->is_nullable());
+ column = make_nullable(column);
+ }
+ mcol[i + column_offset]->insert_indices_from(
+ *column, _build_block_rows.data(),
_build_block_rows.data() + size);
} else {
mcol[i + column_offset]->resize(size);
}
@@ -208,12 +213,29 @@ struct ProcessHashTableProbe {
assert_cast<ColumnNullable *>(
mcol[i +
column_offset].get())->insert_join_null_data();
} else {
- auto &column =
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
- mcol[i +
column_offset]->insert_from(column, _build_block_rows[j]);
+ auto column =
_build_blocks[_build_block_offsets[j]]
+ .get_by_position(i)
+ .column;
+ if (mcol[i + column_offset]->is_nullable()
xor
+ column->is_nullable()) {
+ DCHECK(mcol[i +
column_offset]->is_nullable() &&
+ !column->is_nullable());
+ column = make_nullable(column);
+ }
+ mcol[i +
column_offset]->insert_from(*column,
+
_build_block_rows[j]);
}
} else {
- auto &column =
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
- mcol[i + column_offset]->insert_from(column,
_build_block_rows[j]);
+ auto column =
_build_blocks[_build_block_offsets[j]]
+ .get_by_position(i)
+ .column;
+ if (mcol[i + column_offset]->is_nullable() xor
+ column->is_nullable()) {
+ DCHECK(mcol[i +
column_offset]->is_nullable() &&
+ !column->is_nullable());
+ column = make_nullable(column);
+ }
+ mcol[i + column_offset]->insert_from(*column,
_build_block_rows[j]);
}
}
} else {
@@ -228,7 +250,11 @@ struct ProcessHashTableProbe {
void probe_side_output_column(MutableColumns& mcol, const
std::vector<bool>& output_slot_flags, int size) {
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i]) {
- auto& column = _probe_block.get_by_position(i).column;
+ auto column = _probe_block.get_by_position(i).column;
+ if (mcol[i]->is_nullable() xor column->is_nullable()) {
+ DCHECK(mcol[i]->is_nullable() && !column->is_nullable());
+ column = make_nullable(column);
+ }
column->replicate(&_items_counts[0], size, *mcol[i]);
} else {
mcol[i]->resize(size);
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 8230d4d697..def5f56ff7 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -851,6 +851,10 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
}
}
+
+ MutableColumns temp_key_columns = _create_temp_key_columns();
+ DCHECK(temp_key_columns.size() == key_size);
+
MutableColumns value_columns;
for (int i = key_size; i < column_withschema.size(); ++i) {
if (!mem_reuse) {
@@ -860,19 +864,24 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
}
}
+ MutableColumns temp_value_columns = _create_temp_value_columns();
+ DCHECK(temp_value_columns.size() == _aggregate_evaluators.size() &&
+ _aggregate_evaluators.size() == column_withschema.size() -
key_size);
+
SCOPED_TIMER(_get_results_timer);
std::visit(
[&](auto&& agg_method) -> void {
auto& data = agg_method.data;
auto& iter = agg_method.iterator;
agg_method.init_once();
- while (iter != data.end() && key_columns[0]->size() <
state->batch_size()) {
+ while (iter != data.end() && temp_key_columns[0]->size() <
state->batch_size()) {
const auto& key = iter->get_first();
auto& mapped = iter->get_second();
- agg_method.insert_key_into_columns(key, key_columns,
_probe_key_sz);
+ agg_method.insert_key_into_columns(key, temp_key_columns,
_probe_key_sz);
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i)
_aggregate_evaluators[i]->insert_result_info(
- mapped + _offsets_of_aggregate_states[i],
value_columns[i].get());
+ mapped + _offsets_of_aggregate_states[i],
+ temp_value_columns[i].get());
++iter;
}
@@ -880,15 +889,15 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
if (agg_method.data.has_null_key_data()) {
// only one key of group by support wrap null key
// here need additional processing logic on the null
key / value
- DCHECK(key_columns.size() == 1);
- DCHECK(key_columns[0]->is_nullable());
- if (key_columns[0]->size() < state->batch_size()) {
- key_columns[0]->insert_data(nullptr, 0);
+ DCHECK(temp_key_columns.size() == 1);
+ DCHECK(temp_key_columns[0]->is_nullable());
+ if (temp_key_columns[0]->size() < state->batch_size())
{
+ temp_key_columns[0]->insert_data(nullptr, 0);
auto mapped = agg_method.data.get_null_key_data();
for (size_t i = 0; i <
_aggregate_evaluators.size(); ++i)
_aggregate_evaluators[i]->insert_result_info(
mapped +
_offsets_of_aggregate_states[i],
- value_columns[i].get());
+ temp_value_columns[i].get());
*eos = true;
}
} else {
@@ -898,6 +907,25 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
},
_agg_data._aggregated_method_variant);
+ for (int i = 0; i < key_size; ++i) {
+ if (key_columns[i]->is_nullable() xor
temp_key_columns[i]->is_nullable()) {
+ DCHECK(key_columns[i]->is_nullable() &&
!temp_key_columns[i]->is_nullable());
+ key_columns[i] =
(*std::move(make_nullable(std::move(temp_key_columns[i])))).mutate();
+ } else {
+ key_columns[i] = std::move(temp_key_columns[i]);
+ }
+ }
+
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ if (value_columns[i]->is_nullable() xor
temp_value_columns[i]->is_nullable()) {
+ DCHECK(value_columns[i]->is_nullable() &&
!temp_value_columns[i]->is_nullable());
+ value_columns[i] =
+
(*std::move(make_nullable(std::move(temp_value_columns[i])))).mutate();
+ } else {
+ value_columns[i] = std::move(temp_value_columns[i]);
+ }
+ }
+
if (!mem_reuse) {
*block = column_withschema;
MutableColumns columns(block->columns());
@@ -1115,4 +1143,20 @@ void AggregationNode::release_tracker() {
mem_tracker()->Release(_mem_usage_record.used_in_state +
_mem_usage_record.used_in_arena);
}
+MutableColumns AggregationNode::_create_temp_key_columns() {
+ MutableColumns key_columns;
+ for (const auto& expr_ctx : _probe_expr_ctxs) {
+ key_columns.push_back(expr_ctx->root()->data_type()->create_column());
+ }
+ return key_columns;
+}
+
+MutableColumns AggregationNode::_create_temp_value_columns() {
+ MutableColumns key_columns;
+ for (const auto& agg : _aggregate_evaluators) {
+ key_columns.push_back(agg->data_type()->create_column());
+ }
+ return key_columns;
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index f020b90a6e..d2f580d327 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -484,6 +484,9 @@ private:
void release_tracker();
+ MutableColumns _create_temp_key_columns();
+ MutableColumns _create_temp_value_columns();
+
using vectorized_execute = std::function<Status(Block* block)>;
using vectorized_pre_agg = std::function<Status(Block* in_block, Block*
out_block)>;
using vectorized_get_result =
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 65765d15d3..858e330efe 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -544,7 +544,16 @@ Status VAnalyticEvalNode::_output_current_block(Block*
block) {
}
for (size_t i = 0; i < _result_window_columns.size(); ++i) {
- block->insert({std::move(_result_window_columns[i]),
_agg_functions[i]->data_type(), ""});
+ SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+ if (output_slot_desc->is_nullable() xor
_agg_functions[i]->data_type()->is_nullable()) {
+ DCHECK(output_slot_desc->is_nullable() &&
+ !_agg_functions[i]->data_type()->is_nullable());
+ block->insert({make_nullable(std::move(_result_window_columns[i])),
+ make_nullable(_agg_functions[i]->data_type()), ""});
+ } else {
+ block->insert(
+ {std::move(_result_window_columns[i]),
_agg_functions[i]->data_type(), ""});
+ }
}
_output_block_index++;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index c3e4e66984..23fc032a95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -945,12 +945,6 @@ public class Analyzer {
slotDescriptor.setIsNullable(true);
return;
}
- for (Expr sourceExpr : slotDescriptor.getSourceExprs()) {
- if (!sourceExpr.isNullable()) {
- throw new VecNotImplException("The slot (" +
slotDescriptor.toString()
- + ") could not be changed to nullable");
- }
- }
}
/**
diff --git
a/regression-test/data/correctness/test_outer_join_with_window_function.out
b/regression-test/data/correctness/test_outer_join_with_window_function.out
new file mode 100644
index 0000000000..e0d7861228
--- /dev/null
+++ b/regression-test/data/correctness/test_outer_join_with_window_function.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+abc xyz 1577946288488507 1492704224 421001 421001
2020-01-19T11:15:21 9999-12-30 00:00:00 9999-12-30T00:00 -
- -
+
diff --git
a/regression-test/suites/correctness/test_outer_join_with_window_function.groovy
b/regression-test/suites/correctness/test_outer_join_with_window_function.groovy
new file mode 100644
index 0000000000..ce6f79edf7
--- /dev/null
+++
b/regression-test/suites/correctness/test_outer_join_with_window_function.groovy
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outer_join_with_with_window_function") {
+ sql """
+ drop table if exists dwd_online_detail;
+ """
+
+ sql """
+ CREATE TABLE `dwd_online_detail` (
+ `logout_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00",
+ `login_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00",
+ `game_code` varchar(50) NOT NULL DEFAULT "-",
+ `plat_code` varchar(50) NOT NULL DEFAULT "-",
+ `account` varchar(255) NOT NULL DEFAULT "-",
+ `playerid` varchar(255) NOT NULL DEFAULT "-",
+ `userid` varchar(255) NOT NULL DEFAULT "-",
+ `pid_code` varchar(50) NOT NULL DEFAULT "-",
+ `gid_code` varchar(50) NOT NULL DEFAULT "-",
+ `org_sid` int(11) NOT NULL DEFAULT "0",
+ `ct_sid` int(11) NOT NULL DEFAULT "0",
+ `next_login_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`logout_time`, `login_time`, `game_code`, `plat_code`,
`account`, `playerid`, `userid`)
+ PARTITION BY RANGE(`logout_time`)
+ (PARTITION p99991230 VALUES [('9999-12-30 00:00:00'), ('9999-12-31
00:00:00')))
+ DISTRIBUTED BY HASH(`game_code`, `plat_code`) BUCKETS 4
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "colocate_with" = "gp_group"
+ );
+ """
+
+ sql """
+ drop table if exists ods_logout;
+ """
+
+ sql """
+ CREATE TABLE `ods_logout` (
+ `day` date NULL COMMENT "",
+ `game` varchar(500) NULL COMMENT "",
+ `plat` varchar(500) NULL COMMENT "",
+ `dt` datetime NULL COMMENT "",
+ `time` bigint(20) NULL COMMENT "",
+ `sid` int(11) NULL COMMENT "",
+ `pid` varchar(500) NULL COMMENT "",
+ `gid` varchar(500) NULL COMMENT "",
+ `account` varchar(500) NULL COMMENT "",
+ `playerid` varchar(500) NULL COMMENT "",
+ `prop` varchar(500) NULL COMMENT "",
+ `p01` varchar(500) NULL COMMENT "",
+ `p02` varchar(500) NULL COMMENT "",
+ `p03` varchar(500) NULL COMMENT "",
+ `p04` varchar(500) NULL COMMENT "",
+ `p05` varchar(500) NULL COMMENT "",
+ `p06` varchar(500) NULL COMMENT "",
+ `p07` varchar(500) NULL COMMENT "",
+ `p08` varchar(500) NULL COMMENT "",
+ `p09` varchar(500) NULL COMMENT "",
+ `p10` varchar(500) NULL COMMENT "",
+ `p11` varchar(500) NULL COMMENT "",
+ `p12` varchar(500) NULL COMMENT "",
+ `p13` varchar(500) NULL COMMENT "",
+ `p14` varchar(500) NULL COMMENT "",
+ `p15` varchar(500) NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`day`, `game`, `plat`)
+ PARTITION BY RANGE(`day`)
+ (PARTITION p201907 VALUES [('2019-07-01'), ('2019-08-01')))
+ DISTRIBUTED BY HASH(`game`, `plat`) BUCKETS 4
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ drop table if exists dim_account_userid_mapping;
+ """
+
+ sql """
+ CREATE TABLE `dim_account_userid_mapping` (
+ `end_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00",
+ `start_time` datetime NOT NULL DEFAULT "9999-12-30 00:00:00",
+ `game_code` varchar(50) NOT NULL,
+ `plat_code` varchar(50) NOT NULL,
+ `userkey` varchar(255) NOT NULL,
+ `userid` varchar(255) NOT NULL,
+ `account` varchar(255) NOT NULL,
+ `pid_code` varchar(50) NOT NULL DEFAULT "-",
+ `gid_code` varchar(50) NOT NULL DEFAULT "-",
+ `region` varchar(50) NOT NULL DEFAULT "-"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`end_time`, `start_time`, `game_code`, `plat_code`,
`userkey`)
+ PARTITION BY RANGE(`end_time`)
+ (PARTITION p20190705 VALUES [('2019-07-05 00:00:00'), ('2019-07-06
00:00:00')))
+ DISTRIBUTED BY HASH(`game_code`, `plat_code`) BUCKETS 4
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "colocate_with" = "gp_group"
+ );
+ """
+
+ sql """
+ drop table if exists ods_login;
+ """
+
+ sql """
+ CREATE TABLE `ods_login` (
+ `day` date NULL COMMENT "",
+ `game` varchar(500) NULL COMMENT "",
+ `plat` varchar(500) NULL COMMENT "",
+ `dt` datetime NULL COMMENT "",
+ `time` bigint(20) NULL COMMENT "",
+ `sid` int(11) NULL COMMENT "",
+ `pid` varchar(500) NULL COMMENT "",
+ `gid` varchar(500) NULL COMMENT "",
+ `account` varchar(500) NULL COMMENT "",
+ `playerid` varchar(500) NULL COMMENT "",
+ `prop` varchar(500) NULL COMMENT "",
+ `p01` varchar(500) NULL COMMENT "",
+ `p02` varchar(500) NULL COMMENT "",
+ `p03` varchar(500) NULL COMMENT "",
+ `p04` varchar(500) NULL COMMENT "",
+ `p05` varchar(500) NULL COMMENT "",
+ `p06` varchar(500) NULL COMMENT "",
+ `p07` varchar(500) NULL COMMENT "",
+ `p08` varchar(500) NULL COMMENT "",
+ `p09` varchar(500) NULL COMMENT "",
+ `p10` varchar(500) NULL COMMENT "",
+ `p11` varchar(500) NULL COMMENT "",
+ `p12` varchar(500) NULL COMMENT "",
+ `p13` varchar(500) NULL COMMENT "",
+ `p14` varchar(500) NULL COMMENT "",
+ `p15` varchar(500) NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`day`, `game`, `plat`)
+ COMMENT "登录ods"
+ PARTITION BY RANGE(`day`)
+ (PARTITION p201803 VALUES [('2018-03-01'), ('2018-04-01')),
+ PARTITION p201804 VALUES [('2018-04-01'), ('2018-05-01')),
+ PARTITION p201805 VALUES [('2018-05-01'), ('2018-06-01')),
+ PARTITION p201806 VALUES [('2018-06-01'), ('2018-07-01')),
+ PARTITION p201807 VALUES [('2018-07-01'), ('2018-08-01')),
+ PARTITION p201808 VALUES [('2018-08-01'), ('2018-09-01')),
+ PARTITION p201809 VALUES [('2018-09-01'), ('2018-10-01')),
+ PARTITION p201810 VALUES [('2018-10-01'), ('2018-11-01')),
+ PARTITION p201811 VALUES [('2018-11-01'), ('2018-12-01')),
+ PARTITION p201812 VALUES [('2018-12-01'), ('2019-01-01')),
+ PARTITION p201901 VALUES [('2019-01-01'), ('2019-02-01')),
+ PARTITION p201902 VALUES [('2019-02-01'), ('2019-03-01')),
+ PARTITION p201903 VALUES [('2019-03-01'), ('2019-04-01')),
+ PARTITION p201904 VALUES [('2019-04-01'), ('2019-05-01')),
+ PARTITION p201905 VALUES [('2019-05-01'), ('2019-06-01')),
+ PARTITION p201906 VALUES [('2019-06-01'), ('2019-07-01')),
+ PARTITION p201907 VALUES [('2019-07-01'), ('2019-08-01')),
+ PARTITION p201908 VALUES [('2019-08-01'), ('2019-09-01')),
+ PARTITION p201909 VALUES [('2019-09-01'), ('2019-10-01')),
+ PARTITION p201910 VALUES [('2019-10-01'), ('2019-11-01')),
+ PARTITION p201911 VALUES [('2019-11-01'), ('2019-12-01')),
+ PARTITION p201912 VALUES [('2019-12-01'), ('2020-01-01')),
+ PARTITION p202001 VALUES [('2020-01-01'), ('2020-02-01')),
+ PARTITION p202002 VALUES [('2020-02-01'), ('2020-03-01')),
+ PARTITION p202003 VALUES [('2020-03-01'), ('2020-04-01')),
+ PARTITION p202004 VALUES [('2020-04-01'), ('2020-05-01')),
+ PARTITION p202005 VALUES [('2020-05-01'), ('2020-06-01')),
+ PARTITION p202006 VALUES [('2020-06-01'), ('2020-07-01')),
+ PARTITION p202007 VALUES [('2020-07-01'), ('2020-08-01')),
+ PARTITION p202008 VALUES [('2020-08-01'), ('2020-09-01')),
+ PARTITION p202009 VALUES [('2020-09-01'), ('2020-10-01')),
+ PARTITION p202010 VALUES [('2020-10-01'), ('2020-11-01')),
+ PARTITION p202011 VALUES [('2020-11-01'), ('2020-12-01')),
+ PARTITION p202012 VALUES [('2020-12-01'), ('2021-01-01')),
+ PARTITION p202101 VALUES [('2021-01-01'), ('2021-02-01')),
+ PARTITION p202102 VALUES [('2021-02-01'), ('2021-03-01')),
+ PARTITION p202103 VALUES [('2021-03-01'), ('2021-04-01')),
+ PARTITION p202104 VALUES [('2021-04-01'), ('2021-05-01')),
+ PARTITION p202105 VALUES [('2021-05-01'), ('2021-06-01')),
+ PARTITION p202106 VALUES [('2021-06-01'), ('2021-07-01')),
+ PARTITION p202107 VALUES [('2021-07-01'), ('2021-08-01')),
+ PARTITION p202108 VALUES [('2021-08-01'), ('2021-09-01')),
+ PARTITION p202109 VALUES [('2021-09-01'), ('2021-10-01')),
+ PARTITION p202110 VALUES [('2021-10-01'), ('2021-11-01')),
+ PARTITION p202111 VALUES [('2021-11-01'), ('2021-12-01')),
+ PARTITION p202112 VALUES [('2021-12-01'), ('2022-01-01')),
+ PARTITION p202201 VALUES [('2022-01-01'), ('2022-02-01')),
+ PARTITION p202202 VALUES [('2022-02-01'), ('2022-03-01')),
+ PARTITION p202203 VALUES [('2022-03-01'), ('2022-04-01')),
+ PARTITION p202204 VALUES [('2022-04-01'), ('2022-05-01')),
+ PARTITION p202205 VALUES [('2022-05-01'), ('2022-06-01')),
+ PARTITION p202206 VALUES [('2022-06-01'), ('2022-07-01')),
+ PARTITION p202207 VALUES [('2022-07-01'), ('2022-08-01')),
+ PARTITION p202208 VALUES [('2022-08-01'), ('2022-09-01')),
+ PARTITION p202209 VALUES [('2022-09-01'), ('2022-10-01')))
+ DISTRIBUTED BY HASH(`game`, `plat`) BUCKETS 4
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "MONTH",
+ "dynamic_partition.time_zone" = "Asia/Shanghai",
+ "dynamic_partition.start" = "-2147483648",
+ "dynamic_partition.end" = "3",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.replication_allocation" = "tag.location.default: 1",
+ "dynamic_partition.buckets" = "4",
+ "dynamic_partition.create_history_partition" = "true",
+ "dynamic_partition.history_partition_num" = "50",
+ "dynamic_partition.hot_partition_num" = "2",
+ "dynamic_partition.reserved_history_periods" = "NULL",
+ "dynamic_partition.start_day_of_month" = "1",
+ "in_memory" = "false",
+ "storage_format" = "V2");
+ """
+
+ sql """
+ insert into ods_logout(day, game, plat, playerid, dt)
values('2019-07-05', 'abc', 'xyz', '1136638398824557', '2019-07-05 00:00:00');
+ """
+
+ sql """
+ insert into dwd_online_detail(game_code, plat_code, playerid, account,
org_sid, ct_sid, login_time, logout_time, pid_code,gid_code)
+ values('abc', 'xyz', '1577946288488507', '1492704224', '421001',
'421001', '2020-01-19 11:15:21', '9999-12-30 00:00:00', '-', '-');
+ """
+
+ qt_select """
+ SELECT
online_detail.game_code,online_detail.plat_code,online_detail.playerid,online_detail.account,online_detail.org_sid
, online_detail.ct_sid ,
+ online_detail.login_time,if(online_detail.logout_time='9999-12-30
00:00:00',coalesce(logout.dt,online_detail.next_login_time),online_detail.logout_time)
logout_time ,online_detail.next_login_time,online_detail.userid
+ ,online_detail.pid_code,online_detail.gid_code
+ from
+ (select
+
tmp.game_code,tmp.plat_code,tmp.playerid,tmp.account,tmp.org_sid,tmp.ct_sid,tmp.login_time,tmp.logout_time,
+ LEAD(tmp.login_time,1, '9999-12-30 00:00:00') over
(partition by tmp.game_code,tmp.plat_code,tmp.playerid order by tmp.login_time)
next_login_time,
+ COALESCE (mp.userid,'-') userid,COALESCE
(mp.pid_code,'-') pid_code,COALESCE (mp.gid_code,'-') gid_code
+ from
+ (select * from dim_account_userid_mapping
+ where start_time < convert_tz(date_add('2019-07-05
00:00:00',INTERVAL 1 day),'Asia/Shanghai','Asia/Shanghai')
+ and end_time >= convert_tz('2019-07-05
00:00:00','Asia/Shanghai','Asia/Shanghai')
+ and game_code ='abc' and plat_code='xyz'
+ ) mp
+ right join
+ (
+ select *,concat_ws('_',pid_code,gid_code,account) userkey
from
+ (select
game_code,plat_code,playerid,account,org_sid,ct_sid,login_time,logout_time,pid_code,gid_code
+ from dwd_online_detail where logout_time='9999-12-30
00:00:00' and game_code='abc' and plat_code ='xyz'
+ union all
+ select game game_code,plat plat_code,playerid,account,sid
org_sid,cast(p08 as int) ct_sid,dt login_time,'9999-12-30 00:00:00'
logout_time,pid pid_code,gid gid_code
+ from ods_login
+ where game='abc' and `plat` = 'xyz'
+ AND dt BETWEEN convert_tz('2019-07-05
00:00:00','Asia/Shanghai','Asia/Shanghai')
+ and convert_tz('2019-07-05
23:59:59','Asia/Shanghai','Asia/Shanghai')
+ and day BETWEEN date_sub('2019-07-05',INTERVAL 1 DAY )
and date_add('2019-07-05',INTERVAL 1 DAY )
+ group by 1,2,3,4,5,6,7,8,9,10
+ ) t
+ ) tmp
+ on mp.game_code=tmp.game_code and mp.plat_code =
tmp.plat_code and mp.userkey = tmp.userkey
+ and tmp.login_time >= mp.start_time and tmp.login_time
< mp.end_time
+ ) online_detail
+ left JOIN
+ (select day,game game_code,plat plat_code,playerid, dt
+ from ods_logout dlt
+ where game='abc' and `plat` = 'xyz'
+ and dt BETWEEN convert_tz('2019-07-05
00:00:00','Asia/Shanghai','Asia/Shanghai')
+ and convert_tz('2019-07-05
23:59:59','Asia/Shanghai','Asia/Shanghai')
+ and day BETWEEN date_sub('2019-07-05',INTERVAL
1 DAY ) and date_add('2019-07-05',INTERVAL 1 DAY )
+ group by 1,2,3,4,5
+ ) logout
+ on online_detail.game_code=logout.game_code and
online_detail.plat_code=logout.plat_code
+ and online_detail.playerid=logout.playerid
+ and logout.dt>online_detail.login_time and logout.dt <
online_detail.next_login_time
+ union all
+ select
game_code,plat_code,playerid,account,org_sid,ct_sid,login_time,logout_time,next_login_time,userid,pid_code,gid_code
+ from dwd_online_detail
+ where logout_time BETWEEN convert_tz('2019-07-05
00:00:00','Asia/Shanghai','Asia/Shanghai')
+ and convert_tz('2019-07-05
23:59:59','Asia/Shanghai','Asia/Shanghai')
+ and not (game_code='abc' and `plat_code` = 'xyz' );
+ """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]