[
https://issues.apache.org/jira/browse/KYLIN-4442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085005#comment-17085005
]
Sonu Singh commented on KYLIN-4442:
-----------------------------------
Hi [~hit_lacus]
Please find below the HiveProducer.java code.
public class HiveProducer
{
private static final Logger logger =
LoggerFactory.getLogger(HiveProducer.class);
private static final int CACHE_MAX_SIZE = 10;
private final HiveConf hiveConf;
private FileSystem fs;
private final LoadingCache<Pair<String, String>, Pair<String,
List<FieldSchema>>> tableFieldSchemaCache;
private final String contentFilePrefix;
private String metricType;
private String prePartitionPath;
private Path curPartitionContentPath;
private int id = 0;
private FSDataOutputStream fout;
public HiveProducer(String metricType, Properties props)
throws Exception
{
this(metricType, props, new HiveConf());
}
HiveProducer(String metricType, Properties props, HiveConf hiveConfig)
throws Exception
{
this.metricType = metricType;
this.hiveConf = hiveConfig;
for (Map.Entry<Object, Object> e : props.entrySet()) {
this.hiveConf.set(e.getKey().toString(), e.getValue().toString());
}
this.fs = FileSystem.get(this.hiveConf);
this.tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new
RemovalListener()
{
public void onRemoval(RemovalNotification<Pair<String, String>, Pair<String,
List<FieldSchema>>> notification)
{
HiveProducer.logger.info("Field schema with table " +
ActiveReservoirReporter.getTableName((Pair)notification.getKey()) + " is
removed due to " + notification
.getCause());
}
})
.maximumSize(10L)
.build(new CacheLoader()
{
public Pair<String, List<FieldSchema>> load(Pair<String, String> tableName)
throws Exception
{
HiveMetaStoreClient metaStoreClient = new
HiveMetaStoreClient(HiveProducer.this.hiveConf);
String tableLocation = metaStoreClient.getTable((String)tableName.getFirst(),
(String)tableName.getSecond()).getSd().getLocation();
List<FieldSchema> fields =
metaStoreClient.getFields((String)tableName.getFirst(),
(String)tableName.getSecond());
metaStoreClient.close();
return new Pair(tableLocation, fields);
}
});
String hostName;
try
{
hostName = InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException e)
{
String hostName;
hostName = "UNKNOWN";
}
this.contentFilePrefix = (hostName + "-" + System.currentTimeMillis() +
"-part-");
}
public void close()
{
this.tableFieldSchemaCache.cleanUp();
}
public void send(Record record)
throws Exception
{
logger.debug("send record: " + record.getType());
HiveProducerRecord hiveRecord = convertTo(record);
logger.debug("send hiveRecord: " + hiveRecord.toString());
write(hiveRecord.key(), Lists.newArrayList(new HiveProducerRecord[] \{
hiveRecord }));
logger.debug("send write: " + hiveRecord.key());
}
public void send(List<Record> recordList)
throws Exception
{
logger.debug("send final List: " + recordList.size());
Map<HiveProducerRecord.RecordKey, List<HiveProducerRecord>> recordMap =
Maps.newHashMap();
for (Record record : recordList)
{
HiveProducerRecord hiveRecord = convertTo(record);
logger.debug(" fhiveRecord: " + hiveRecord.toString());
if (recordMap.get(hiveRecord.key()) == null)
{
logger.debug(" send(final List: hiveRecord.key(): " + hiveRecord.key());
recordMap.put(hiveRecord.key(), Lists.newLinkedList());
}
((List)recordMap.get(hiveRecord.key())).add(hiveRecord);
logger.debug(" send(final List: recordMap: " + recordMap.size());
}
for (Map.Entry<HiveProducerRecord.RecordKey, List<HiveProducerRecord>> entry :
recordMap.entrySet())
{
logger.debug(" for (Map.Entry<RecordKey: " + entry.getKey());
write((HiveProducerRecord.RecordKey)entry.getKey(),
(Iterable)entry.getValue());
}
}
private void write(HiveProducerRecord.RecordKey recordKey,
Iterable<HiveProducerRecord> recordItr)
throws Exception
{
logger.debug("write entry: " + recordKey.table());
String tableLocation = (String)((Pair)this.tableFieldSchemaCache.get(new
Pair(recordKey.database(), recordKey.table()))).getFirst();
logger.debug(" Step 1: write tableLocation: " + tableLocation);
StringBuilder sb = new StringBuilder();
sb.append(tableLocation);
for (Map.Entry<String, String> e : recordKey.partition().entrySet())
{
sb.append("/");
sb.append(((String)e.getKey()).toLowerCase(Locale.ROOT));
sb.append("=");
sb.append((String)e.getValue());
}
Path partitionPath = new Path(sb.toString());
logger.debug("Step 1: write partitionPath: " + sb.toString());
if ((partitionPath.toUri().getScheme() != null) &&
(!partitionPath.toUri().toString().startsWith(this.fs.getUri().toString())))
{
logger.debug("Step 1: write partitionPath.toUri().getScheme(): " +
partitionPath.toUri().getScheme());
this.fs.close();
this.fs = partitionPath.getFileSystem(this.hiveConf);
logger.debug("Step 1: write partitionPath.toUri().getScheme(): " +
this.fs.getName());
}
if (!this.fs.exists(partitionPath))
{
logger.debug("Step 2: create partition for hive: " + partitionPath);
StringBuilder hql = new StringBuilder();
hql.append("ALTER TABLE ");
hql.append(recordKey.database() + "." + recordKey.table());
hql.append(" ADD IF NOT EXISTS PARTITION (");
boolean ifFirst = true;
for (Map.Entry<String, String> e : recordKey.partition().entrySet())
{
if (ifFirst) {
ifFirst = false;
} else {
hql.append(",");
}
hql.append(((String)e.getKey()).toLowerCase(Locale.ROOT));
hql.append("='" + (String)e.getValue() + "'");
}
hql.append(")");
logger.debug("create partition by {}.", hql);
Driver driver = new Driver(this.hiveConf);
logger.debug("Step 2: create partition for hive: driver: " + driver);
CliSessionState session = new CliSessionState(this.hiveConf);
logger.debug("Step 2: create partition for hive: session: " + session);
SessionState.start(session);
logger.debug("Step 2: create partition for hive: start session ");
CommandProcessorResponse res = driver.run(hql.toString());
logger.debug("Step 2: create partition for hive:res: " + res);
if (res.getResponseCode() != 0)
{
logger.warn("Fail to add partition. HQL: {}; Cause by: {}", hql
.toString(), res
.toString());
logger.debug("Fail to add partition. HQL: {}; Cause by: {}", hql
.toString(), res
.toString());
}
session.close();
logger.debug("Step 2: create partition for hive: session close ");
driver.close();
logger.debug("Step 2: create partition for hive: driver close ");
}
int nRetry;
if ((this.fout == null) || (this.prePartitionPath == null) ||
(this.prePartitionPath.compareTo(partitionPath.toString()) != 0))
{
logger.debug("Step 3: create path for new partition: " +
partitionPath.toString());
if (this.fout != null)
{
logger.debug("Flush output stream of previous partition path {}. Using a new
one {}. ", this.prePartitionPath, partitionPath);
closeFout();
}
Path partitionContentPath = new Path(partitionPath, this.contentFilePrefix +
String.format(Locale.ROOT, "%04d", new Object[] \{ Integer.valueOf(this.id) }));
logger.debug("Try to use new partition content path: {} for metric: {}",
partitionContentPath, this.metricType);
if (!this.fs.exists(partitionContentPath))
{
logger.debug("Step 3: create path for new partition: if condition " +
partitionPath.toString());
nRetry = 0;
while ((!this.fs.createNewFile(partitionContentPath)) && (nRetry++ < 5) &&
(!this.fs.exists(partitionContentPath))) {
Thread.sleep(500L * nRetry);
}
if (!this.fs.exists(partitionContentPath)) {
throw new IllegalStateException("Fail to create HDFS file: " +
partitionContentPath + " after " + nRetry + " retries");
}
}
logger.debug("Step 3: create HDFS file");
this.fout = this.fs.append(partitionContentPath);
logger.debug("Step 3: fout: " + this.fout);
this.prePartitionPath = partitionPath.toString();
logger.debug("Step 3: prePartitionPath: " + this.prePartitionPath);
this.curPartitionContentPath = partitionContentPath;
logger.debug("Step 3: curPartitionContentPath: " +
this.curPartitionContentPath);
this.id = ((this.id + 1) % 10);
logger.debug("Step 3: id: " + this.id);
}
try
{
logger.debug("Step 4");
int count = 0;
for (HiveProducerRecord elem : recordItr)
{
logger.debug("Step 4: elem: " + elem.toString());
this.fout.writeBytes(elem.valueToString() + "\n");
logger.debug("Step 4: fout.writeByte: " + this.fout);
count++;
}
logger.debug("Success to write {} metrics ({}) to file {}", new Object[] \{
Integer.valueOf(count), this.metricType, this.curPartitionContentPath });
}
catch (IOException e)
{
logger.error("Fails to write metrics(" + this.metricType + ") to file " +
this.curPartitionContentPath.toString() + " due to ", e);
closeFout();
}
}
private void closeFout()
{
if (this.fout != null) {
try
{
logger.debug("Flush output stream {}.", this.curPartitionContentPath);
this.fout.close();
}
catch (Exception e)
{
logger.error("Close the path: " + this.curPartitionContentPath + " failed", e);
if ((this.fs instanceof DistributedFileSystem))
{
DistributedFileSystem hdfs = (DistributedFileSystem)this.fs;
try
{
boolean bool = hdfs.recoverLease(this.curPartitionContentPath);
}
catch (Exception e1)
{
logger.error("Recover lease for path: " + this.curPartitionContentPath + "
failed", e1);
}
}
}
}
this.fout = null;
}
private HiveProducerRecord convertTo(Record record)
throws Exception
{
logger.debug("convertTo: " + record.getType());
Map<String, Object> rawValue = record.getValueRaw();
logger.debug("convertTo rawValue: " + rawValue.toString());
Map<String, String> partitionKVs = Maps.newHashMapWithExpectedSize(1);
logger.debug("convertTo partitionKVs: " + partitionKVs.toString());
partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rawValue
.get(TimePropertyEnum.DAY_DATE.toString()).toString());
logger.debug("convertTo partitionKVs value: " +
(String)partitionKVs.get(TimePropertyEnum.DAY_DATE.toString()));
return
parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()),
partitionKVs, rawValue);
}
public HiveProducerRecord parseToHiveProducerRecord(String tableName,
Map<String, String> partitionKVs, Map<String, Object> rawValue)
throws Exception
{
logger.debug("parseToHiveProducerRecord: " + tableName);
Pair<String, String> tableNameSplits =
ActiveReservoirReporter.getTableNameSplits(tableName);
logger.debug("parseToHiveProducerRecord tableNameSplits: " +
(String)tableNameSplits.getFirst() + " " + (String)tableNameSplits.getSecond());
List<FieldSchema> fields =
(List)((Pair)this.tableFieldSchemaCache.get(tableNameSplits)).getSecond();
logger.debug("parseToHiveProducerRecord fields: " + fields.toString());
List<Object> columnValues = Lists.newArrayListWithExpectedSize(fields.size());
logger.debug("parseToHiveProducerRecord columnValues: " +
columnValues.toString());
for (FieldSchema fieldSchema : fields)
{
logger.debug("parseToHiveProducerRecord fieldSchema: " +
fieldSchema.getName());
columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT)));
}
logger.debug("parseToHiveProducerRecord return");
return new HiveProducerRecord((String)tableNameSplits.getFirst(),
(String)tableNameSplits.getSecond(), partitionKVs, columnValues);
}
}
> Kylin System Cube Hive tables not able to update
> -------------------------------------------------
>
> Key: KYLIN-4442
> URL: https://issues.apache.org/jira/browse/KYLIN-4442
> Project: Kylin
> Issue Type: Bug
> Affects Versions: v3.0.0
> Reporter: Sonu Singh
> Priority: Blocker
> Attachments: kylin-metrics-reporter-hive-3.1.0-SNAPSHOT.jar
>
>
> Kylin System Cube Hive table is not updating. I am using Kylin hadoop3 code
> and running on vanilla hadoop3.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)