[ 
https://issues.apache.org/jira/browse/KYLIN-4442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085005#comment-17085005
 ] 

Sonu Singh commented on KYLIN-4442:
-----------------------------------

Hi [~hit_lacus]

Please find below the HiveProducer.java code.

 

public class HiveProducer
{
 private static final Logger logger = 
LoggerFactory.getLogger(HiveProducer.class);
 private static final int CACHE_MAX_SIZE = 10;
 private final HiveConf hiveConf;
 private FileSystem fs;
 private final LoadingCache<Pair<String, String>, Pair<String, 
List<FieldSchema>>> tableFieldSchemaCache;
 private final String contentFilePrefix;
 private String metricType;
 private String prePartitionPath;
 private Path curPartitionContentPath;
 private int id = 0;
 private FSDataOutputStream fout;
 
 public HiveProducer(String metricType, Properties props)
 throws Exception
 {
 this(metricType, props, new HiveConf());
 }
 
 HiveProducer(String metricType, Properties props, HiveConf hiveConfig)
 throws Exception
 {
 this.metricType = metricType;
 this.hiveConf = hiveConfig;
 for (Map.Entry<Object, Object> e : props.entrySet()) {
 this.hiveConf.set(e.getKey().toString(), e.getValue().toString());
 }
 this.fs = FileSystem.get(this.hiveConf);
 
 this.tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new 
RemovalListener()
 {
 public void onRemoval(RemovalNotification<Pair<String, String>, Pair<String, 
List<FieldSchema>>> notification)
 {
 HiveProducer.logger.info("Field schema with table " + 
 ActiveReservoirReporter.getTableName((Pair)notification.getKey()) + " is 
removed due to " + notification
 .getCause());
 }
 })
 
 .maximumSize(10L)
 .build(new CacheLoader()
 {
 public Pair<String, List<FieldSchema>> load(Pair<String, String> tableName)
 throws Exception
 {
 HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(HiveProducer.this.hiveConf);
 
 String tableLocation = metaStoreClient.getTable((String)tableName.getFirst(), 
(String)tableName.getSecond()).getSd().getLocation();
 List<FieldSchema> fields = 
metaStoreClient.getFields((String)tableName.getFirst(), 
 (String)tableName.getSecond());
 metaStoreClient.close();
 return new Pair(tableLocation, fields);
 }
 });
 String hostName;
 try
 {
 hostName = InetAddress.getLocalHost().getHostName();
 }
 catch (UnknownHostException e)
 {
 String hostName;
 hostName = "UNKNOWN";
 }
 this.contentFilePrefix = (hostName + "-" + System.currentTimeMillis() + 
"-part-");
 }
 
 public void close()
 {
 this.tableFieldSchemaCache.cleanUp();
 }
 
 public void send(Record record)
 throws Exception
 {
 logger.debug("send record: " + record.getType());
 HiveProducerRecord hiveRecord = convertTo(record);
 logger.debug("send hiveRecord: " + hiveRecord.toString());
 write(hiveRecord.key(), Lists.newArrayList(new HiveProducerRecord[] \{ 
hiveRecord }));
 logger.debug("send write: " + hiveRecord.key());
 }
 
 public void send(List<Record> recordList)
 throws Exception
 {
 logger.debug("send final List: " + recordList.size());
 Map<HiveProducerRecord.RecordKey, List<HiveProducerRecord>> recordMap = 
Maps.newHashMap();
 for (Record record : recordList)
 {
 HiveProducerRecord hiveRecord = convertTo(record);
 logger.debug(" fhiveRecord: " + hiveRecord.toString());
 if (recordMap.get(hiveRecord.key()) == null)
 {
 logger.debug(" send(final List: hiveRecord.key(): " + hiveRecord.key());
 recordMap.put(hiveRecord.key(), Lists.newLinkedList());
 }
 ((List)recordMap.get(hiveRecord.key())).add(hiveRecord);
 logger.debug(" send(final List: recordMap: " + recordMap.size());
 }
 for (Map.Entry<HiveProducerRecord.RecordKey, List<HiveProducerRecord>> entry : 
recordMap.entrySet())
 {
 logger.debug(" for (Map.Entry<RecordKey: " + entry.getKey());
 write((HiveProducerRecord.RecordKey)entry.getKey(), 
(Iterable)entry.getValue());
 }
 }
 
 private void write(HiveProducerRecord.RecordKey recordKey, 
Iterable<HiveProducerRecord> recordItr)
 throws Exception
 {
 logger.debug("write entry: " + recordKey.table());
 
 String tableLocation = (String)((Pair)this.tableFieldSchemaCache.get(new 
Pair(recordKey.database(), recordKey.table()))).getFirst();
 logger.debug(" Step 1: write tableLocation: " + tableLocation);
 StringBuilder sb = new StringBuilder();
 sb.append(tableLocation);
 for (Map.Entry<String, String> e : recordKey.partition().entrySet())
 {
 sb.append("/");
 sb.append(((String)e.getKey()).toLowerCase(Locale.ROOT));
 sb.append("=");
 sb.append((String)e.getValue());
 }
 Path partitionPath = new Path(sb.toString());
 logger.debug("Step 1: write partitionPath: " + sb.toString());
 if ((partitionPath.toUri().getScheme() != null) && 
(!partitionPath.toUri().toString().startsWith(this.fs.getUri().toString())))
 {
 logger.debug("Step 1: write partitionPath.toUri().getScheme(): " + 
partitionPath.toUri().getScheme());
 this.fs.close();
 this.fs = partitionPath.getFileSystem(this.hiveConf);
 logger.debug("Step 1: write partitionPath.toUri().getScheme(): " + 
this.fs.getName());
 }
 if (!this.fs.exists(partitionPath))
 {
 logger.debug("Step 2: create partition for hive: " + partitionPath);
 StringBuilder hql = new StringBuilder();
 hql.append("ALTER TABLE ");
 hql.append(recordKey.database() + "." + recordKey.table());
 hql.append(" ADD IF NOT EXISTS PARTITION (");
 boolean ifFirst = true;
 for (Map.Entry<String, String> e : recordKey.partition().entrySet())
 {
 if (ifFirst) {
 ifFirst = false;
 } else {
 hql.append(",");
 }
 hql.append(((String)e.getKey()).toLowerCase(Locale.ROOT));
 hql.append("='" + (String)e.getValue() + "'");
 }
 hql.append(")");
 logger.debug("create partition by {}.", hql);
 Driver driver = new Driver(this.hiveConf);
 logger.debug("Step 2: create partition for hive: driver: " + driver);
 CliSessionState session = new CliSessionState(this.hiveConf);
 logger.debug("Step 2: create partition for hive: session: " + session);
 SessionState.start(session);
 logger.debug("Step 2: create partition for hive: start session ");
 CommandProcessorResponse res = driver.run(hql.toString());
 logger.debug("Step 2: create partition for hive:res: " + res);
 if (res.getResponseCode() != 0)
 {
 logger.warn("Fail to add partition. HQL: {}; Cause by: {}", hql
 .toString(), res
 .toString());
 logger.debug("Fail to add partition. HQL: {}; Cause by: {}", hql
 .toString(), res
 .toString());
 }
 session.close();
 logger.debug("Step 2: create partition for hive: session close ");
 driver.close();
 logger.debug("Step 2: create partition for hive: driver close ");
 }
 int nRetry;
 if ((this.fout == null) || (this.prePartitionPath == null) || 
(this.prePartitionPath.compareTo(partitionPath.toString()) != 0))
 {
 logger.debug("Step 3: create path for new partition: " + 
partitionPath.toString());
 if (this.fout != null)
 {
 logger.debug("Flush output stream of previous partition path {}. Using a new 
one {}. ", this.prePartitionPath, partitionPath);
 closeFout();
 }
 Path partitionContentPath = new Path(partitionPath, this.contentFilePrefix + 
String.format(Locale.ROOT, "%04d", new Object[] \{ Integer.valueOf(this.id) }));
 logger.debug("Try to use new partition content path: {} for metric: {}", 
partitionContentPath, this.metricType);
 if (!this.fs.exists(partitionContentPath))
 {
 logger.debug("Step 3: create path for new partition: if condition " + 
partitionPath.toString());
 nRetry = 0;
 while ((!this.fs.createNewFile(partitionContentPath)) && (nRetry++ < 5) && 
 (!this.fs.exists(partitionContentPath))) {
 Thread.sleep(500L * nRetry);
 }
 if (!this.fs.exists(partitionContentPath)) {
 throw new IllegalStateException("Fail to create HDFS file: " + 
partitionContentPath + " after " + nRetry + " retries");
 }
 }
 logger.debug("Step 3: create HDFS file");
 
 this.fout = this.fs.append(partitionContentPath);
 logger.debug("Step 3: fout: " + this.fout);
 this.prePartitionPath = partitionPath.toString();
 logger.debug("Step 3: prePartitionPath: " + this.prePartitionPath);
 this.curPartitionContentPath = partitionContentPath;
 logger.debug("Step 3: curPartitionContentPath: " + 
this.curPartitionContentPath);
 this.id = ((this.id + 1) % 10);
 logger.debug("Step 3: id: " + this.id);
 }
 try
 {
 logger.debug("Step 4");
 int count = 0;
 for (HiveProducerRecord elem : recordItr)
 {
 logger.debug("Step 4: elem: " + elem.toString());
 this.fout.writeBytes(elem.valueToString() + "\n");
 logger.debug("Step 4: fout.writeByte: " + this.fout);
 count++;
 }
 logger.debug("Success to write {} metrics ({}) to file {}", new Object[] \{ 
Integer.valueOf(count), this.metricType, this.curPartitionContentPath });
 }
 catch (IOException e)
 {
 logger.error("Fails to write metrics(" + this.metricType + ") to file " + 
this.curPartitionContentPath.toString() + " due to ", e);
 
 closeFout();
 }
 }
 
 private void closeFout()
 {
 if (this.fout != null) {
 try
 {
 logger.debug("Flush output stream {}.", this.curPartitionContentPath);
 this.fout.close();
 }
 catch (Exception e)
 {
 logger.error("Close the path: " + this.curPartitionContentPath + " failed", e);
 if ((this.fs instanceof DistributedFileSystem))
 {
 DistributedFileSystem hdfs = (DistributedFileSystem)this.fs;
 try
 {
 boolean bool = hdfs.recoverLease(this.curPartitionContentPath);
 }
 catch (Exception e1)
 {
 logger.error("Recover lease for path: " + this.curPartitionContentPath + " 
failed", e1);
 }
 }
 }
 }
 this.fout = null;
 }
 
 private HiveProducerRecord convertTo(Record record)
 throws Exception
 {
 logger.debug("convertTo: " + record.getType());
 Map<String, Object> rawValue = record.getValueRaw();
 logger.debug("convertTo rawValue: " + rawValue.toString());
 
 Map<String, String> partitionKVs = Maps.newHashMapWithExpectedSize(1);
 logger.debug("convertTo partitionKVs: " + partitionKVs.toString());
 partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rawValue
 .get(TimePropertyEnum.DAY_DATE.toString()).toString());
 logger.debug("convertTo partitionKVs value: " + 
(String)partitionKVs.get(TimePropertyEnum.DAY_DATE.toString()));
 return 
parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()),
 partitionKVs, rawValue);
 }
 
 public HiveProducerRecord parseToHiveProducerRecord(String tableName, 
Map<String, String> partitionKVs, Map<String, Object> rawValue)
 throws Exception
 {
 logger.debug("parseToHiveProducerRecord: " + tableName);
 Pair<String, String> tableNameSplits = 
ActiveReservoirReporter.getTableNameSplits(tableName);
 logger.debug("parseToHiveProducerRecord tableNameSplits: " + 
(String)tableNameSplits.getFirst() + " " + (String)tableNameSplits.getSecond());
 List<FieldSchema> fields = 
(List)((Pair)this.tableFieldSchemaCache.get(tableNameSplits)).getSecond();
 logger.debug("parseToHiveProducerRecord fields: " + fields.toString());
 List<Object> columnValues = Lists.newArrayListWithExpectedSize(fields.size());
 logger.debug("parseToHiveProducerRecord columnValues: " + 
columnValues.toString());
 for (FieldSchema fieldSchema : fields)
 {
 logger.debug("parseToHiveProducerRecord fieldSchema: " + 
fieldSchema.getName());
 columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase(Locale.ROOT)));
 }
 logger.debug("parseToHiveProducerRecord return");
 return new HiveProducerRecord((String)tableNameSplits.getFirst(), 
(String)tableNameSplits.getSecond(), partitionKVs, columnValues);
 }
}

 

> Kylin System Cube Hive tables not able to update 
> -------------------------------------------------
>
>                 Key: KYLIN-4442
>                 URL: https://issues.apache.org/jira/browse/KYLIN-4442
>             Project: Kylin
>          Issue Type: Bug
>    Affects Versions: v3.0.0
>            Reporter: Sonu Singh
>            Priority: Blocker
>         Attachments: kylin-metrics-reporter-hive-3.1.0-SNAPSHOT.jar
>
>
> Kylin System Cube Hive table is not updating. I am using Kylin hadoop3 code 
> and running on vanilla hadoop3.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to